IGNITE-141 - Marshallers refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/74078f6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/74078f6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/74078f6a Branch: refs/heads/ignite-141 Commit: 74078f6ad397a823bb5d5a687213d72fb31899d1 Parents: a06dc4a Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Mar 3 15:25:23 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Mar 3 15:25:23 2015 -0800 ---------------------------------------------------------------------- .../ignite/internal/GridEventConsumeHandler.java | 11 ++++++++++- .../ignite/internal/GridMessageListenHandler.java | 11 ++++++++++- .../cache/query/GridCacheQueryManager.java | 3 ++- .../continuous/CacheContinuousQueryHandler.java | 11 ++++++++++- .../continuous/GridContinuousHandler.java | 9 ++++++++- .../continuous/GridContinuousProcessor.java | 8 +++----- .../continuous/GridEventConsumeSelfTest.java | 3 ++- .../internal/processors/igfs/IgfsSizeSelfTest.java | 17 ----------------- 8 files changed, 45 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 68d8c0b..fda5ebd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -283,6 +282,16 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousHandler clone() { + try { + return (GridContinuousHandler)super.clone(); + } + catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { boolean b = filterBytes != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 199d0ac..6412b63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -163,6 +162,16 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousHandler clone() { + try { + return (GridContinuousHandler)super.clone(); + } + catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(depEnabled); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 8fa48aa..0d03e36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1958,7 +1958,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte }, new P1<GridCache<?, ?>>() { @Override public boolean apply(GridCache<?, ?> c) { - return !CU.UTILITY_CACHE_NAME.equals(c.name()) && !CU.ATOMICS_CACHE_NAME.equals(c.name()); + return !CU.MARSH_CACHE_NAME.equals(c.name()) && !CU.UTILITY_CACHE_NAME.equals(c.name()) && + !CU.ATOMICS_CACHE_NAME.equals(c.name()); } } ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 9502b3f..69e12b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -370,6 +369,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousHandler clone() { + try { + return (GridContinuousHandler)super.clone(); + } + catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); out.writeObject(topic); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 17c7a0a..69639c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -27,7 +27,7 @@ import java.util.*; /** * Continuous routine handler. */ -public interface GridContinuousHandler extends Externalizable { +public interface GridContinuousHandler extends Externalizable, Cloneable { /** * Registers listener. * @@ -89,6 +89,13 @@ public interface GridContinuousHandler extends Externalizable { @Nullable public Object orderedTopic(); /** + * Clones this handler. + * + * @return Clone of this handler. + */ + public GridContinuousHandler clone(); + + /** * @return {@code True} if for events. */ public boolean isForEvents(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index eed273d..0948211 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -355,9 +355,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Register handler only if local node passes projection predicate. if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { try { - if (ctx.config().isPeerClassLoadingEnabled()) - item.hnd.p2pUnmarshal(data.nodeId, ctx); - if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe, false)) item.hnd.onListenerRegistered(item.routineId, ctx); @@ -394,7 +391,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Generate ID. final UUID routineId = UUID.randomUUID(); - StartRequestData reqData = new StartRequestData(prjPred, hnd, bufSize, interval, autoUnsubscribe); + StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe); try { if (ctx.config().isPeerClassLoadingEnabled()) { @@ -416,7 +413,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } // Handle peer deployment for other handler-specific objects. - hnd.p2pMarshal(ctx); + reqData.hnd.p2pMarshal(ctx); } } catch (IgniteCheckedException e) { @@ -520,6 +517,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (!nodes.isEmpty()) { // Do not send projection predicate (nodes already filtered). reqData.prjPred = null; + reqData.prjPredBytes = null; // Send start requests. try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java index 459786c..a51d1a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; import org.apache.ignite.testframework.junits.common.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -41,7 +42,7 @@ import static org.apache.ignite.internal.processors.continuous.GridContinuousPro /** * Event consume test. */ -public class GridEventConsumeSelfTest extends GridCommonAbstractTest { +public class GridEventConsumeSelfTest extends GridCommonAbstractTest implements Serializable { /** */ private static final String PRJ_PRED_CLS_NAME = "org.apache.ignite.tests.p2p.GridEventConsumeProjectionPredicate"; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74078f6a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java index b212f02..40bb2ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -44,7 +43,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; -import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -600,25 +598,10 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest { assertEquals(expSize, cache.igfsDataSpaceUsed()); } - // Start a node. - final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); - - for (int i = 0; i < GRID_CNT - 1; i++) { - grid(0).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_PRELOAD_STOPPED); - } - Ignite g = startGrid(GRID_CNT); info("Started grid: " + g.cluster().localNode().id()); - U.awaitQuiet(latch); - // Wait partitions are evicted. awaitPartitionMapExchange();