IGNITE-141 - Marshallers refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/74078f6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/74078f6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/74078f6a

Branch: refs/heads/ignite-141
Commit: 74078f6ad397a823bb5d5a687213d72fb31899d1
Parents: a06dc4a
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Tue Mar 3 15:25:23 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Tue Mar 3 15:25:23 2015 -0800

----------------------------------------------------------------------
 .../ignite/internal/GridEventConsumeHandler.java   | 11 ++++++++++-
 .../ignite/internal/GridMessageListenHandler.java  | 11 ++++++++++-
 .../cache/query/GridCacheQueryManager.java         |  3 ++-
 .../continuous/CacheContinuousQueryHandler.java    | 11 ++++++++++-
 .../continuous/GridContinuousHandler.java          |  9 ++++++++-
 .../continuous/GridContinuousProcessor.java        |  8 +++-----
 .../continuous/GridEventConsumeSelfTest.java       |  3 ++-
 .../internal/processors/igfs/IgfsSizeSelfTest.java | 17 -----------------
 8 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 68d8c0b..fda5ebd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -283,6 +282,16 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousHandler clone() {
+        try {
+            return (GridContinuousHandler)super.clone();
+        }
+        catch (CloneNotSupportedException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         boolean b = filterBytes != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 199d0ac..6412b63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -163,6 +162,16 @@ public class GridMessageListenHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousHandler clone() {
+        try {
+            return (GridContinuousHandler)super.clone();
+        }
+        catch (CloneNotSupportedException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeBoolean(depEnabled);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8fa48aa..0d03e36 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1958,7 +1958,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 },
                 new P1<GridCache<?, ?>>() {
                     @Override public boolean apply(GridCache<?, ?> c) {
-                        return !CU.UTILITY_CACHE_NAME.equals(c.name()) && 
!CU.ATOMICS_CACHE_NAME.equals(c.name());
+                        return !CU.MARSH_CACHE_NAME.equals(c.name()) && 
!CU.UTILITY_CACHE_NAME.equals(c.name()) &&
+                            !CU.ATOMICS_CACHE_NAME.equals(c.name());
                     }
                 }
             );

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 9502b3f..69e12b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
@@ -370,6 +369,16 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousHandler clone() {
+        try {
+            return (GridContinuousHandler)super.clone();
+        }
+        catch (CloneNotSupportedException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, cacheName);
         out.writeObject(topic);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 17c7a0a..69639c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -27,7 +27,7 @@ import java.util.*;
 /**
  * Continuous routine handler.
  */
-public interface GridContinuousHandler extends Externalizable {
+public interface GridContinuousHandler extends Externalizable, Cloneable {
     /**
      * Registers listener.
      *
@@ -89,6 +89,13 @@ public interface GridContinuousHandler extends 
Externalizable {
     @Nullable public Object orderedTopic();
 
     /**
+     * Clones this handler.
+     *
+     * @return Clone of this handler.
+     */
+    public GridContinuousHandler clone();
+
+    /**
      * @return {@code True} if for events.
      */
     public boolean isForEvents();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eed273d..0948211 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -355,9 +355,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 // Register handler only if local node passes projection 
predicate.
                 if (item.prjPred == null || 
item.prjPred.apply(ctx.discovery().localNode())) {
                     try {
-                        if (ctx.config().isPeerClassLoadingEnabled())
-                            item.hnd.p2pUnmarshal(data.nodeId, ctx);
-
                         if (registerHandler(data.nodeId, item.routineId, 
item.hnd, item.bufSize, item.interval,
                             item.autoUnsubscribe, false))
                             item.hnd.onListenerRegistered(item.routineId, ctx);
@@ -394,7 +391,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Generate ID.
         final UUID routineId = UUID.randomUUID();
 
-        StartRequestData reqData = new StartRequestData(prjPred, hnd, bufSize, 
interval, autoUnsubscribe);
+        StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), 
bufSize, interval, autoUnsubscribe);
 
         try {
             if (ctx.config().isPeerClassLoadingEnabled()) {
@@ -416,7 +413,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 }
 
                 // Handle peer deployment for other handler-specific objects.
-                hnd.p2pMarshal(ctx);
+                reqData.hnd.p2pMarshal(ctx);
             }
         }
         catch (IgniteCheckedException e) {
@@ -520,6 +517,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         if (!nodes.isEmpty()) {
             // Do not send projection predicate (nodes already filtered).
             reqData.prjPred = null;
+            reqData.prjPredBytes = null;
 
             // Send start requests.
             try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index 459786c..a51d1a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -41,7 +42,7 @@ import static 
org.apache.ignite.internal.processors.continuous.GridContinuousPro
 /**
  * Event consume test.
  */
-public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
+public class GridEventConsumeSelfTest extends GridCommonAbstractTest 
implements Serializable {
     /** */
     private static final String PRJ_PRED_CLS_NAME = 
"org.apache.ignite.tests.p2p.GridEventConsumeProjectionPredicate";
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index b212f02..40bb2ac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -44,7 +43,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheDistributionMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CachePreloadMode.*;
-import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -600,25 +598,10 @@ public class IgfsSizeSelfTest extends 
IgfsCommonAbstractTest {
             assertEquals(expSize, cache.igfsDataSpaceUsed());
         }
 
-        // Start a node.
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
-
-        for (int i = 0; i < GRID_CNT - 1; i++) {
-            grid(0).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    latch.countDown();
-
-                    return true;
-                }
-            }, EVT_CACHE_PRELOAD_STOPPED);
-        }
-
         Ignite g = startGrid(GRID_CNT);
 
         info("Started grid: " + g.cluster().localNode().id());
 
-        U.awaitQuiet(latch);
-
         // Wait partitions are evicted.
         awaitPartitionMapExchange();
 

Reply via email to