IGNITE-61 - Direct marshalling

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/89977d7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/89977d7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/89977d7b

Branch: refs/heads/ignite-82
Commit: 89977d7bcfce37993646feea48dfedf5b6e6467f
Parents: b711371
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 8 15:01:45 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 8 15:01:45 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 35 +++++++++++++++++---
 .../managers/communication/GridIoManager.java   | 17 ++++------
 .../processors/cache/GridCacheIoManager.java    | 26 ++-------------
 .../continuous/GridContinuousProcessor.java     |  6 ----
 .../processors/fs/GridGgfsDeleteWorker.java     |  8 +----
 5 files changed, 40 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index 6b33fe3..9a0c46e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.direct;
 
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -568,9 +569,13 @@ public class DirectByteBufferStream {
             Type itemType = type(itemCls);
 
             while (it.hasNext() || cur != NULL) {
-                if (cur == NULL)
+                if (cur == NULL) {
                     cur = it.next();
 
+                    if (itemType == Type.MSG)
+                        cur = ((MessageAdapter)cur).clone();
+                }
+
                 write(itemType, cur);
 
                 if (!lastFinished)
@@ -599,9 +604,13 @@ public class DirectByteBufferStream {
             Type itemType = type(itemCls);
 
             while (it.hasNext() || cur != NULL) {
-                if (cur == NULL)
+                if (cur == NULL) {
                     cur = it.next();
 
+                    if (itemType == Type.MSG)
+                        cur = ((MessageAdapter)cur).clone();
+                }
+
                 write(itemType, cur);
 
                 if (!lastFinished)
@@ -632,10 +641,28 @@ public class DirectByteBufferStream {
             Type valType = type(valCls);
 
             while (it.hasNext() || cur != NULL) {
-                if (cur == NULL)
+                Map.Entry<K, V> e;
+
+                if (cur == NULL) {
                     cur = it.next();
 
-                Map.Entry<K, V> e = (Map.Entry<K, V>)cur;
+                    e = (Map.Entry<K, V>)cur;
+
+                    if (keyType == Type.MSG || valType == Type.MSG) {
+                        K k = e.getKey();
+                        V v = e.getValue();
+
+                        if (k != null && keyType == Type.MSG)
+                            k = (K)((MessageAdapter)k).clone();
+
+                        if (v != null && valType == Type.MSG)
+                            v = (V)((MessageAdapter)v).clone();
+
+                        cur = e = F.t(k, v);
+                    }
+                }
+                else
+                    e = (Map.Entry<K, V>)cur;
 
                 if (!keyDone) {
                     write(keyType, e.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6902617..abb83bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -960,8 +960,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         assert msg != null;
         assert plc != null;
 
-        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
ordered, timeout, skipOnTimeout);
-
         if (locNodeId.equals(node.id())) {
             assert plc != P2P_POOL;
 
@@ -970,12 +968,16 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (commLsnr == null)
                 throw new IgniteCheckedException("Trying to send message when 
grid is not fully started.");
 
+            GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
ordered, timeout, skipOnTimeout);
+
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
         }
         else {
+            GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, 
msg.clone(), ordered, timeout, skipOnTimeout);
+
             ioMsg.setWriter(writerFactory.writer());
 
             if (topicOrd < 0)
@@ -1302,15 +1304,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             // Small optimization, as communication SPIs may have lighter 
implementation for sending
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
-                boolean first = true;
-
-                for (ClusterNode node : nodes) {
-                    MessageAdapter msg0 = first ? msg : msg.clone();
-
-                    first = false;
-
-                    send(node, topic, topicOrd, msg0, plc, ordered, timeout, 
skipOnTimeout);
-                }
+                for (ClusterNode node : nodes)
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, 
skipOnTimeout);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection 
[topic=" + topic + ", msg=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 69d151e..c13be53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -358,23 +358,12 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
             log.debug("Sending cache message [msg=" + msg + ", node=" + 
U.toShortString(node) + ']');
 
         int cnt = 0;
-        boolean first = true;
 
         while (cnt <= retryCnt) {
             try {
                 cnt++;
 
-                GridCacheMessage<K, V> msg0;
-
-                if (first) {
-                    msg0 = msg;
-
-                    first = false;
-                }
-                else
-                    msg0 = (GridCacheMessage<K, V>)msg.clone();
-
-                cctx.gridIO().send(node, TOPIC_CACHE, msg0, plc);
+                cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
 
                 return;
             }
@@ -426,7 +415,6 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
         final Collection<UUID> leftIds = new GridLeanSet<>();
 
         int cnt = 0;
-        boolean first = true;
 
         while (cnt < retryCnt) {
             try {
@@ -436,17 +424,7 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
                     }
                 });
 
-                GridCacheMessage<K, V> msg0;
-
-                if (first) {
-                    msg0 = msg;
-
-                    first = false;
-                }
-                else
-                    msg0 = (GridCacheMessage<K, V>)msg.clone();
-
-                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
+                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, SYSTEM_POOL);
 
                 boolean added = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1fd9571..bf24a85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1132,13 +1132,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         if (msg.data() != null && (nodes.size() > 1 || 
!ctx.localNodeId().equals(F.first(nodes).id())))
             msg.dataBytes(marsh.marshal(msg.data()));
 
-        boolean first = true;
-
         for (ClusterNode node : nodes) {
-            msg = first ? msg : (GridContinuousMessage)msg.clone();
-
-            first = false;
-
             int cnt = 0;
 
             while (cnt <= retryCnt) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89977d7b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java
index f0eabc6..1842403 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteWorker.java
@@ -332,15 +332,9 @@ public class GridGgfsDeleteWorker extends GridGgfsThread {
 
         Collection<ClusterNode> nodes = meta.metaCacheNodes();
 
-        boolean first = true;
-
         for (ClusterNode node : nodes) {
-            GridGgfsCommunicationMessage msg0 = first ? msg : 
(GridGgfsCommunicationMessage)msg.clone();
-
-            first = false;
-
             try {
-                ggfsCtx.send(node, topic, msg0, GridIoPolicy.SYSTEM_POOL);
+                ggfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 U.warn(log, "Failed to send GGFS delete message to node 
[nodeId=" + node.id() +

Reply via email to