# IGNITE-489 Use CacheRebalancingEvent instead of CacheEvent.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/19a895dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/19a895dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/19a895dc Branch: refs/heads/ignite-341 Commit: 19a895dcb8e1a64ee7b060188ce3eb3c48161d4e Parents: 5b0a5f3 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed Mar 25 16:47:40 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed Mar 25 16:47:40 2015 +0300 ---------------------------------------------------------------------- .../ignite/events/CacheRebalancingEvent.java | 1 + .../processors/cache/GridCacheEventManager.java | 3 +-- .../cache/GridCachePartitionExchangeManager.java | 2 +- .../dht/GridClientPartitionTopology.java | 10 +++++----- .../dht/GridDhtPartitionTopology.java | 4 ++-- .../dht/GridDhtPartitionTopologyImpl.java | 19 ++++++++++--------- .../preloader/GridDhtPartitionDemandPool.java | 7 ++++--- ...GridCachePartitionNotLoadedEventSelfTest.java | 2 +- 8 files changed, 25 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java index 661c2b5..ed40161 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; * @see EventType#EVT_CACHE_REBALANCE_PART_UNLOADED * @see EventType#EVT_CACHE_REBALANCE_STARTED * @see EventType#EVT_CACHE_REBALANCE_STOPPED + * @see EventType#EVT_CACHE_REBALANCE_PART_DATA_LOST */ public class CacheRebalancingEvent extends EventAdapter { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 4d1e166..f543e3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -240,8 +240,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { @Nullable String cloClsName, @Nullable String taskName ) { - assert key != null || type == EVT_CACHE_STARTED || type == EVT_CACHE_STOPPED - || type == EVT_CACHE_REBALANCE_PART_DATA_LOST; + assert key != null || type == EVT_CACHE_STARTED || type == EVT_CACHE_STOPPED; if (!cctx.events().isRecordable(type)) LT.warn(log, null, "Added event without checking if event is recordable: " + U.gridEventName(type)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1c22b44..d2d3531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -914,7 +914,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (cacheCtx.isLocal()) continue; - changed |= cacheCtx.topology().afterExchange(exchFut.exchangeId()); + changed |= cacheCtx.topology().afterExchange(exchFut); // Preload event notification. if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index a38a51e..4640192 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -254,17 +254,17 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { - AffinityTopologyVersion topVer = exchId.topologyVersion(); + @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { + AffinityTopologyVersion topVer = exchFut.topologyVersion(); lock.writeLock().lock(); try { - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchFut.exchangeId() + ']'; if (log.isDebugEnabled()) - log.debug("Partition map before afterExchange [exchId=" + exchId + ", fullMap=" + + log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); updateSeq.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index ac4b36a..c551fb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -85,11 +85,11 @@ public interface GridDhtPartitionTopology { /** * Post-initializes this topology. * - * @param exchId Exchange ID for this post-initialization. + * @param exchFut Exchange future. * @return {@code True} if mapping was changed. * @throws IgniteCheckedException If failed. */ - public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException; + public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException; /** * @param topVer Topology version at the time of creation. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 593c463..c3571e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; @@ -26,7 +27,6 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -388,14 +388,14 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { + @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { boolean changed = waitForRent(); ClusterNode loc = cctx.localNode(); int num = cctx.affinity().partitions(); - AffinityTopologyVersion topVer = exchId.topologyVersion(); + AffinityTopologyVersion topVer = exchFut.topologyVersion(); lock.writeLock().lock(); @@ -403,11 +403,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { if (stopping) return false; - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchFut.exchangeId() + ']'; if (log.isDebugEnabled()) - log.debug("Partition map before afterExchange [exchId=" + exchId + ", fullMap=" + + log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); long updateSeq = this.updateSeq.incrementAndGet(); @@ -443,9 +443,10 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { changed = true; if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - cctx.events().addEvent(p, null, cctx.localNodeId(), (IgniteUuid)null, - null, EVT_CACHE_REBALANCE_PART_DATA_LOST, null, false, null, - false, null, null, null); + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 7d7f3b0..f20a751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -1042,9 +1042,10 @@ public class GridDhtPartitionDemandPool<K, V> { top.own(part); if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - cctx.events().addEvent(p, null, cctx.localNodeId(), (IgniteUuid)null, - null, EVT_CACHE_REBALANCE_PART_DATA_LOST, null, false, null, - false, null, null, null); + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/19a895dc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java index 6e1f1a1..3ccc0bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java @@ -140,7 +140,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract /** {@inheritDoc} */ @Override public boolean apply(Event evt) { - lostParts.add(((CacheEvent)evt).partition()); + lostParts.add(((CacheRebalancingEvent)evt).partition()); return true; }