IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/89977d7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/89977d7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/89977d7b Branch: refs/heads/ignite-82 Commit: 89977d7bcfce37993646feea48dfedf5b6e6467f Parents: b711371 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 8 15:01:45 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 8 15:01:45 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 35 +++++++++++++++++--- .../managers/communication/GridIoManager.java | 17 ++++------ .../processors/cache/GridCacheIoManager.java | 26 ++------------- .../continuous/GridContinuousProcessor.java | 6 ---- .../processors/fs/GridGgfsDeleteWorker.java | 8 +---- 5 files changed, 40 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index 6b33fe3..9a0c46e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.direct; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -568,9 +569,13 @@ public class DirectByteBufferStream { Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { - if (cur == NULL) + if (cur == NULL) { cur = it.next(); + if (itemType == Type.MSG) + cur = ((MessageAdapter)cur).clone(); + } + write(itemType, cur); if (!lastFinished) @@ -599,9 +604,13 @@ public class DirectByteBufferStream { Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { - if (cur == NULL) + if (cur == NULL) { cur = it.next(); + if (itemType == Type.MSG) + cur = ((MessageAdapter)cur).clone(); + } + write(itemType, cur); if (!lastFinished) @@ -632,10 +641,28 @@ public class DirectByteBufferStream { Type valType = type(valCls); while (it.hasNext() || cur != NULL) { - if (cur == NULL) + Map.Entry<K, V> e; + + if (cur == NULL) { cur = it.next(); - Map.Entry<K, V> e = (Map.Entry<K, V>)cur; + e = (Map.Entry<K, V>)cur; + + if (keyType == Type.MSG || valType == Type.MSG) { + K k = e.getKey(); + V v = e.getValue(); + + if (k != null && keyType == Type.MSG) + k = (K)((MessageAdapter)k).clone(); + + if (v != null && valType == Type.MSG) + v = (V)((MessageAdapter)v).clone(); + + cur = e = F.t(k, v); + } + } + else + e = (Map.Entry<K, V>)cur; if (!keyDone) { write(keyType, e.getKey()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 6902617..abb83bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -960,8 +960,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert msg != null; assert plc != null; - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); - if (locNodeId.equals(node.id())) { assert plc != P2P_POOL; @@ -970,12 +968,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (commLsnr == null) throw new IgniteCheckedException("Trying to send message when grid is not fully started."); + GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); + if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); } else { + GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg.clone(), ordered, timeout, skipOnTimeout); + ioMsg.setWriter(writerFactory.writer()); if (topicOrd < 0) @@ -1302,15 +1304,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // Small optimization, as communication SPIs may have lighter implementation for sending // messages to one node vs. many. if (!nodes.isEmpty()) { - boolean first = true; - - for (ClusterNode node : nodes) { - MessageAdapter msg0 = first ? msg : msg.clone(); - - first = false; - - send(node, topic, topicOrd, msg0, plc, ordered, timeout, skipOnTimeout); - } + for (ClusterNode node : nodes) + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 69d151e..c13be53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -358,23 +358,12 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); int cnt = 0; - boolean first = true; while (cnt <= retryCnt) { try { cnt++; - GridCacheMessage<K, V> msg0; - - if (first) { - msg0 = msg; - - first = false; - } - else - msg0 = (GridCacheMessage<K, V>)msg.clone(); - - cctx.gridIO().send(node, TOPIC_CACHE, msg0, plc); + cctx.gridIO().send(node, TOPIC_CACHE, msg, plc); return; } @@ -426,7 +415,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V final Collection<UUID> leftIds = new GridLeanSet<>(); int cnt = 0; - boolean first = true; while (cnt < retryCnt) { try { @@ -436,17 +424,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } }); - GridCacheMessage<K, V> msg0; - - if (first) { - msg0 = msg; - - first = false; - } - else - msg0 = (GridCacheMessage<K, V>)msg.clone(); - - cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL); + cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, SYSTEM_POOL); boolean added = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1fd9571..bf24a85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1132,13 +1132,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (msg.data() != null && (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) msg.dataBytes(marsh.marshal(msg.data())); - boolean first = true; - for (ClusterNode node : nodes) { - msg = first ? msg : (GridContinuousMessage)msg.clone(); - - first = false; - int cnt = 0; while (cnt <= retryCnt) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java index f0eabc6..1842403 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java @@ -332,15 +332,9 @@ public class GridGgfsDeleteWorker extends GridGgfsThread { Collection<ClusterNode> nodes = meta.metaCacheNodes(); - boolean first = true; - for (ClusterNode node : nodes) { - GridGgfsCommunicationMessage msg0 = first ? msg : (GridGgfsCommunicationMessage)msg.clone(); - - first = false; - try { - ggfsCtx.send(node, topic, msg0, GridIoPolicy.SYSTEM_POOL); + ggfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL); } catch (IgniteCheckedException e) { U.warn(log, "Failed to send GGFS delete message to node [nodeId=" + node.id() +