IGNITE-45 - Fixing tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/384e5457 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/384e5457 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/384e5457 Branch: refs/heads/ignite-45 Commit: 384e5457e9e476ef589b69354118a365f51947bb Parents: 33c3d43 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Mar 17 15:01:02 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Mar 17 15:01:02 2015 -0700 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 27 +++++++++++++++----- .../dht/GridClientPartitionTopology.java | 15 +++++++++++ .../dht/GridDhtPartitionTopology.java | 7 ++++- .../dht/GridDhtPartitionTopologyImpl.java | 5 +++- .../preloader/GridDhtPartitionDemandPool.java | 15 +++-------- .../GridDhtPartitionsExchangeFuture.java | 10 ++++++-- .../preloader/GridDhtPartitionsFullMessage.java | 3 ++- .../GridCacheAtomicMessageCountSelfTest.java | 1 - .../GridCacheClientModesAbstractSelfTest.java | 6 ++++- 9 files changed, 65 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1c4c0f8..78b009b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -355,6 +355,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param cacheId Cache ID. + */ + public GridClientPartitionTopology clearClientTopology(int cacheId) { + return clientTops.remove(cacheId); + } + + /** * Gets topology version of last completed partition exchange. * * @return Topology version. @@ -544,6 +551,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); } + // It is important that client topologies be added after contexts. for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); @@ -673,14 +681,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionTopology top = cacheCtx.topology(); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { + Integer cacheId = entry.getKey(); - GridDhtPartitionFullMap partMap = msg.partitions().get(cacheCtx.cacheId()); + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPartitionTopology top = null; + + if (cacheCtx == null) + top = clientTops.get(cacheId); + else if (!cacheCtx.isLocal()) + top = cacheCtx.topology(); + + if (top != null) + updated |= top.update(null, entry.getValue()) != null; - updated |= top.update(null, partMap) != null; - } } if (updated) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 27e1c22..d39d880 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -136,6 +136,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { @Override public void updateTopologyVersion( GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut, + long updSeq, boolean stopping ) { lock.writeLock().lock(); @@ -148,6 +149,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { topVer = exchId.topologyVersion(); + updateSeq.setIfGreater(updSeq); + topReadyFut = exchFut; } finally { @@ -427,6 +430,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + public long lastUpdateSequence() { + lock.writeLock().lock(); + + try { + return updateSeq.incrementAndGet(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index dcc0502..21d46b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -47,7 +47,12 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param exchFut Exchange future. */ - public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut, boolean stopping); + public void updateTopologyVersion( + GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchFut, + long updateSeq, + boolean stopping + ); /** * Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index c123e92..60e9b70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -158,6 +158,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { @Override public void updateTopologyVersion( GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut, + long updSeq, boolean stopping ) { lock.writeLock().lock(); @@ -170,6 +171,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { topVer = exchId.topologyVersion(); + updateSeq.setIfGreater(updSeq); + topReadyFut = exchFut; } finally { @@ -229,7 +232,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldest(cctx.shared(), topVer); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 3ba08a3..98128c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -36,7 +37,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -73,7 +73,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; - /** Future for preload mode {@link org.apache.ignite.cache.CacheRebalanceMode#SYNC}. */ + /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */ @GridToStringInclude private SyncFuture syncFut; @@ -162,7 +162,7 @@ public class GridDhtPartitionDemandPool<K, V> { } /** - * @return Future for {@link org.apache.ignite.cache.CacheRebalanceMode#SYNC} mode. + * @return Future for {@link CacheRebalanceMode#SYNC} mode. */ IgniteInternalFuture<?> syncFuture() { return syncFut; @@ -826,7 +826,7 @@ public class GridDhtPartitionDemandPool<K, V> { try { cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get(); } - catch (IgniteInterruptedCheckedException e) { + catch (IgniteInterruptedCheckedException ignored) { if (log.isDebugEnabled()) log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " + "[cacheName=" + cctx.name() + ']'); @@ -1084,13 +1084,6 @@ public class GridDhtPartitionDemandPool<K, V> { } /** - * Empty constructor required for {@link Externalizable}. - */ - public SyncFuture() { - assert false; - } - - /** * @param w Worker who iterated through all partitions. */ void onWorkerDone(DemandWorker w) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a8cc921..dff7dd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -439,9 +439,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert exchId.nodeId().equals(discoEvt.eventNode().id()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology( + cacheCtx.cacheId()); + + long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence(); + // Update before waiting for locks. if (!cacheCtx.isLocal()) - cacheCtx.topology().updateTopologyVersion(exchId, this, stopping(cacheCtx.cacheId())); + cacheCtx.topology().updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); } // Grab all alive remote nodes with order of equal or less than last joined node. @@ -509,7 +514,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - top.updateTopologyVersion(exchId, this, stopping(top.cacheId())); + top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId())); top.beforeExchange(this); } @@ -645,6 +650,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); } + // It is important that client topologies be added after contexts. for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index b4bf7f8..8256274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -82,7 +82,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param fullMap Full partitions map. */ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { - parts.put(cacheId, fullMap); + if (!parts.containsKey(cacheId)) + parts.put(cacheId, fullMap); } /** {@inheritDoc} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index 8e8e7be..ac71c6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -71,7 +71,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest cCfg.setWriteSynchronizationMode(FULL_SYNC); cCfg.setAtomicWriteOrderMode(writeOrderMode); - // TODO IGNITE-45 test hangs. (client node started first) if (idx == 0 && client) cfg.setClientMode(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/384e5457/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java index 9030365..557cb41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java @@ -49,12 +49,15 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst gridCnt = new AtomicInteger(); super.beforeTestsStarted(); + + if (!clientOnly()) { + grid(nearOnlyGridName).createCache(new NearCacheConfiguration()); + } } @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - // TODO IGNITE-45 test hangs if (gridCnt.getAndIncrement() == 0) { cfg.setClientMode(true); @@ -65,6 +68,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { CacheConfiguration cfg = super.cacheConfiguration(gridName);