Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 647691f17 -> 62d39de2a
IGNITE-45 - WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32e26d30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32e26d30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32e26d30 Branch: refs/heads/ignite-45 Commit: 32e26d304337d952ce42fe18d6aa4453f2f1a212 Parents: 647691f Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Mar 4 12:50:36 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Mar 4 12:50:36 2015 -0800 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 6 ---- .../processors/cache/GridCacheProcessor.java | 8 ++++++ .../dht/GridDhtPartitionTopologyImpl.java | 2 +- .../distributed/dht/GridDhtTopologyFuture.java | 7 +++++ .../dht/atomic/GridDhtAtomicCache.java | 25 ++++------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 29 ++++---------------- .../GridDhtPartitionsExchangeFuture.java | 12 ++++++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 12 ++++++-- .../cache/IgniteDynamicCacheStartSelfTest.java | 20 +++++++++++++- 9 files changed, 66 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 4bb5885..b964f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -150,8 +150,6 @@ public class GridAffinityAssignmentCache { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); - U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + - ", discoEvt=" + discoEvt + ']'); GridAffinityAssignment prev = affCache.get(topVer.previous()); @@ -164,8 +162,6 @@ public class GridAffinityAssignmentCache { // Resolve nodes snapshot for specified topology version. Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion()); - U.debug(log, "Affinity nodes: " + nodes); - sorted = sort(nodes); } @@ -191,8 +187,6 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); - U.debug(log, "Updated assignment: " + updated); - updated = F.addIfAbsent(affCache, topVer, updated); // Update top version, if required. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a22c9a2..9a8cbcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1191,6 +1191,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cacheCtx.cache()); onKernalStart(cacheCtx.cache()); + + caches.put(cacheCtx.name(), cacheCtx.cache()); } /** @@ -1198,7 +1200,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param startDesc Cache start descriptor. */ + @SuppressWarnings("unchecked") public void onCacheStartFinished(DynamicCacheDescriptor startDesc) { + GridCacheAdapter<?, ?> cache = caches.get(startDesc.cacheConfiguration().getName()); + + if (cache != null) + jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + CacheConfiguration ccfg = startDesc.cacheConfiguration(); DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8efd5eb..59a1cbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -221,7 +221,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id())) { + if (oldest.id().equals(loc.id()) || exchId.isCacheAdded()) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index 704fadb..d704e13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -42,4 +42,11 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo * @throws IgniteCheckedException If topology future failed. */ public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException; + + /** + * Gets topology version of this future. + * + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index db592ea..3efdaca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -323,7 +323,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, false, - entry, filter); } @@ -340,7 +339,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - entry, filter); } @@ -425,7 +423,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ctx.portableEnabled()) val = (V)ctx.marshalToPortable(val); - return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val)); + return removeAllAsync0(F.asList(key), null, true, true, ctx.equalsPeekArray(val)); } /** {@inheritDoc} */ @@ -441,7 +439,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, true, - null, ctx.equalsPeekArray(oldVal)); } @@ -461,7 +458,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null, filter); } @@ -482,7 +478,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null, null); } @@ -498,7 +493,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { A.notNull(key, "key"); - return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter); + return removeAllAsync0(Collections.singletonList(key), null, true, false, filter); } /** {@inheritDoc} */ @@ -512,7 +507,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgnitePredicate<Cache.Entry<K, V>>[] filter) { A.notNull(keys, "keys"); - return removeAllAsync0(keys, null, null, false, false, filter); + return removeAllAsync0(keys, null, false, false, filter); } /** {@inheritDoc} */ @@ -527,7 +522,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { A.notNull(key, "key"); - return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter); + return removeAllAsync0(Collections.singletonList(key), null, false, false, filter); } /** {@inheritDoc} */ @@ -565,7 +560,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - return removeAllAsync0(null, conflictMap, null, false, false, null); + return removeAllAsync0(null, conflictMap, false, false, null); } /** @@ -669,7 +664,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, false, - null, null); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @@ -713,7 +707,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, false, - null, null); } @@ -743,7 +736,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, false, - null, null); } @@ -757,7 +749,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param conflictRmvMap Conflict remove map. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. - * @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}. * @param filter Cache entry filter for atomic updates. * @return Completion future. */ @@ -770,7 +761,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, - @Nullable GridCacheEntryEx<K, V> cached, @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter ) { if (map != null && keyCheck) @@ -797,7 +787,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { conflictRmvMap != null ? conflictRmvMap.values() : null, retval, rawRetval, - cached, prj != null ? prj.expiry() : null, filter, subjId, @@ -817,7 +806,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * * @param keys Keys to remove. * @param conflictMap Conflict map. - * @param cached Cached cache entry for key. May be passed if and only if keys size is {@code 1}. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param filter Cache entry filter for atomic removes. @@ -826,7 +814,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private IgniteInternalFuture removeAllAsync0( @Nullable final Collection<? extends K> keys, @Nullable final Map<? extends K, GridCacheVersion> conflictMap, - @Nullable GridCacheEntryEx<K, V> cached, final boolean retval, boolean rawRetval, @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter @@ -860,7 +847,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { keys != null ? null : conflictMap.values(), retval, rawRetval, - cached, (filter != null && prj != null) ? prj.expiry() : null, filter, subjId, @@ -2321,7 +2307,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drRmvVals, req.returnValue(), false, - null, req.expiry(), req.filter(), req.subjectId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 8a3ca8d..3cd4fea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -23,7 +23,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -107,9 +106,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Return value require flag. */ private final boolean retval; - /** Cached entry if keys size is 1. */ - private GridCacheEntryEx<K, V> cached; - /** Expiry policy. */ private final ExpiryPolicy expiryPlc; @@ -179,7 +175,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param conflictRmvVals Conflict remove values (optional). * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. - * @param cached Cached entry if keys size is 1. * @param expiryPlc Expiry policy explicitly specified for cache operation. * @param filter Entry filter. * @param subjId Subject ID. @@ -197,7 +192,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final boolean rawRetval, - @Nullable GridCacheEntryEx<K, V> cached, @Nullable ExpiryPolicy expiryPlc, final IgnitePredicate<Cache.Entry<K, V>>[] filter, UUID subjId, @@ -210,7 +204,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); - assert cached == null || keys.size() == 1; assert subjId != null; this.cctx = cctx; @@ -223,7 +216,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; this.retval = retval; - this.cached = cached; this.expiryPlc = expiryPlc; this.filter = filter; this.subjId = subjId; @@ -431,18 +423,18 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> private void mapOnTopology(final Collection<? extends K> keys, final boolean remap, final UUID oldNodeId) { cache.topology().readLock(); - GridDiscoveryTopologySnapshot snapshot = null; + AffinityTopologyVersion topVer = null; try { GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { + topVer = fut.topologyVersion(); + if (futVer == null) // Assign future version in topology read lock before first exception may be thrown. futVer = cctx.versions().next(topVer); - // We are holding topology read lock and current topology is ready, we can start mapping. - snapshot = fut.topologySnapshot(); } else { fut.listenAsync(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -454,23 +446,16 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> return; } - topVer = new AffinityTopologyVersion(snapshot.topologyVersion()); - mapTime = U.currentTimeMillis(); if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) cctx.mvcc().addAtomicFuture(version(), this); } - catch (IgniteCheckedException e) { - onDone(new IgniteCheckedException("Failed to get topology snapshot for update operation: " + this, e)); - - return; - } finally { cache.topology().readUnlock(); } - map0(snapshot, keys, remap, oldNodeId); + map0(topVer, keys, remap, oldNodeId); } /** @@ -488,19 +473,17 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } /** - * @param topSnapshot Topology snapshot to map on. * @param keys Keys to map. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ - private void map0(GridDiscoveryTopologySnapshot topSnapshot, + private void map0( + AffinityTopologyVersion topVer, Collection<? extends K> keys, boolean remap, @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; - AffinityTopologyVersion topVer = new AffinityTopologyVersion(topSnapshot.topologyVersion()); - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index de87904..4741bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -261,6 +261,11 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff return topSnapshot.get(); } + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return exchId.topologyVersion(); + } + /** * @return Dummy flag. */ @@ -662,11 +667,14 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff /** {@inheritDoc} */ @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { - if (err == null) { - for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { + if (err == null) { if (!cacheCtx.isLocal()) cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10); } + + if (startDesc != null && F.eq(startDesc.cacheConfiguration().getName(), cacheCtx.name())) + cacheCtx.preloader().onInitialExchangeComplete(err); } cctx.exchange().onExchangeDone(this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 81b128e..46dae5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -4489,12 +4489,20 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov TcpDiscoverySpiState spiState = spiStateCopy(); + Map<Long, Collection<ClusterNode>> hist; + + synchronized (mux) { + hist = new TreeMap<>(topHist); + } + + Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion()); + if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), ring.node(msg.creatorNodeId()), - null, - null, + snapshot, + hist, msg.message()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32e26d30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index efb5db2..70af7c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -144,9 +145,26 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { final IgniteKernal kernal = (IgniteKernal)grid(0); CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setName("TestCacheName3"); + String cacheName = "TestCacheName3"; + + ccfg.setName(cacheName); kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); + + for (int g = 0; g < nodeCount(); g++) { + IgniteKernal kernal0 = (IgniteKernal)grid(g); + + for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) + f.get(); + + assertNotNull(grid(g).jcache(cacheName)); + } + + grid(0).jcache(cacheName).put("1", "1"); + + for (int g = 0; g < nodeCount(); g++) + assertEquals("1", grid(g).jcache(cacheName).get("1")); } }