IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30d96ad5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30d96ad5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30d96ad5 Branch: refs/heads/ignite-45 Commit: 30d96ad594b2d9372a400847068e803bd3c9631a Parents: 55a9c50 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Mar 3 19:21:08 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Mar 3 19:21:08 2015 -0800 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 66 +++++++++++++++++--- .../affinity/AffinityTopologyVersion.java | 23 ++++++- .../affinity/GridAffinityAssignmentCache.java | 6 ++ .../cache/DynamicCacheDescriptor.java | 28 ++++++--- .../GridCachePartitionExchangeManager.java | 54 ++++++++++------ .../processors/cache/GridCacheProcessor.java | 36 ++++++++--- .../cache/GridCacheSharedContext.java | 7 +++ .../dht/GridDhtPartitionTopologyImpl.java | 2 +- .../preloader/GridDhtPartitionExchangeId.java | 5 +- .../GridDhtPartitionsExchangeFuture.java | 34 +++++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 1 + .../cache/IgniteDynamicCacheStartSelfTest.java | 13 ++++ 12 files changed, 219 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index dce04e2..d891149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -166,6 +166,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Custom event listener. */ private GridPlainInClosure<Serializable> customEvtLsnr; + /** Map of dynamic cache filters. */ + private Map<String, IgnitePredicate<ClusterNode>> dynamicCacheFilters = new HashMap<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -214,6 +217,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { getSpi().setNodeAttributes(attrs, ver); } + /** + * Adds dynamic cache filters. + * + * @param cacheName Cache name. + * @param filter Cache filter. + */ + public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) { + IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter); + + assert old == null; + } + /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { discoOrdered = discoOrdered(); @@ -277,10 +292,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { c.updateAlives(node); } + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + try { + customEvtLsnr.apply(data); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + data, e); + } + } + // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED && type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (type != EVT_NODE_METRICS_UPDATED) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(topVer, cache); @@ -307,15 +331,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - try { - customEvtLsnr.apply(data); - } - catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); - } - } - if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) { boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer); @@ -1834,6 +1849,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { GridCacheAttributes[] caches = node.attribute(ATTR_CACHE); + boolean hasCaches = false; + if (caches != null) { nodesWithCaches.add(node); @@ -1860,6 +1877,35 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + hasCaches = true; + } + + for (Map.Entry<String, IgnitePredicate<ClusterNode>> entry : dynamicCacheFilters.entrySet()) { + String cacheName = entry.getKey(); + IgnitePredicate<ClusterNode> filter = entry.getValue(); + + if (filter.apply(node)) { + addToMap(cacheMap, cacheName, node); + + if (alive(node.id())) + addToMap(aliveCacheNodes, maskNull(cacheName), node); + + addToMap(dhtNodesMap, cacheName, node); + + // TODO IGNITE-45 client and near caches. + + if (!loc.id().equals(node.id())) { + addToMap(rmtCacheMap, cacheName, node); + + if (alive(node.id())) + addToMap(aliveRmtCacheNodes, maskNull(cacheName), node); + } + + hasCaches = true; + } + } + + if (hasCaches) { if (alive(node.id())) { aliveNodesWithCaches.add(node); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java index e276253..cb24ecd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -89,17 +89,34 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi /** {@inheritDoc} */ @Override public int compareTo(AffinityTopologyVersion o) { - return Long.compare(topVer, o.topVer); + int cmp = Long.compare(topVer, o.topVer); + + if (cmp == 0) + return Integer.compare(minorTopVer, o.minorTopVer); + + return cmp; } /** {@inheritDoc} */ @Override public boolean equals(Object o) { - return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer; + if (this == o) + return true; + + if (!(o instanceof AffinityTopologyVersion)) + return false; + + AffinityTopologyVersion that = (AffinityTopologyVersion)o; + + return minorTopVer == that.minorTopVer && topVer == that.topVer; } /** {@inheritDoc} */ @Override public int hashCode() { - return (int)topVer; + int result = (int)(topVer ^ (topVer >>> 32)); + + result = 31 * result + minorTopVer; + + return result; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index b964f83..4bb5885 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -150,6 +150,8 @@ public class GridAffinityAssignmentCache { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); + U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + + ", discoEvt=" + discoEvt + ']'); GridAffinityAssignment prev = affCache.get(topVer.previous()); @@ -162,6 +164,8 @@ public class GridAffinityAssignmentCache { // Resolve nodes snapshot for specified topology version. Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion()); + U.debug(log, "Affinity nodes: " + nodes); + sorted = sort(nodes); } @@ -187,6 +191,8 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); + U.debug(log, "Updated assignment: " + updated); + updated = F.addIfAbsent(affCache, topVer, updated); // Update top version, if required. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 196730c..6a6e227 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -28,28 +29,35 @@ import java.io.*; * Cache start descriptor. */ public class DynamicCacheDescriptor implements Serializable { + /** Cache start ID. */ + private IgniteUuid startId; + /** Cache configuration. */ @GridToStringExclude private CacheConfiguration cacheCfg; /** Deploy filter bytes. */ @GridToStringExclude - private byte[] deployFltrBytes; - - /** Cache start ID. */ - private IgniteUuid startId; + private IgnitePredicate<ClusterNode> nodeFilter; /** * @param cacheCfg Cache configuration. - * @param deployFltrBytes Deployment filter bytes. + * @param nodeFilter Node filter. */ - public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) { + public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) { this.cacheCfg = cacheCfg; - this.deployFltrBytes = deployFltrBytes; + this.nodeFilter = nodeFilter; this.startId = startId; } /** + * @return Start ID. + */ + public IgniteUuid startId() { + return startId; + } + + /** * @return Cache configuration. */ public CacheConfiguration cacheConfiguration() { @@ -57,10 +65,10 @@ public class DynamicCacheDescriptor implements Serializable { } /** - * @return Start ID. + * @return Node filter. */ - public IgniteUuid startId() { - return startId; + public IgnitePredicate<ClusterNode> nodeFilter() { + return nodeFilter; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 76ecea4..d38161e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -114,6 +115,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final ClusterNode n = e.eventNode(); + GridDhtPartitionExchangeId exchId = null; + GridDhtPartitionsExchangeFuture<K, V> exchFut = null; + if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { assert !loc.id().equals(n.id()); @@ -129,12 +133,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - GridDhtPartitionExchangeId exchId = exchangeId(n.id(), + exchId = exchangeId(n.id(), new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0), e.type()); - GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e); + exchFut = exchangeFuture(exchId, e, null); + } + else { + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; + + if (customEvt.data() instanceof DynamicCacheDescriptor) { + DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data(); + + // Check if this event should trigger partition exchange. + if (cctx.cache().dynamicCacheRegistered(desc)) { + exchId = exchangeId(n.id(), + new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer), + e.type()); + + exchFut = exchangeFuture(exchId, e, desc); + } + } + } + if (exchId != null) { // Start exchange process. pendingExchangeFuts.add(exchFut); @@ -161,9 +183,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } }); } - else { - // TODO. - } } finally { leaveBusy(); @@ -225,7 +244,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); - GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt); + GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt, null); new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); @@ -399,16 +418,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * Callback to start exchange for dynamically started cache. - * - * @param cacheDesc Cache descriptor. - */ - public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) { - // TODO IGNITE-45 move to exchange future. - cctx.kernalContext().cache().onCacheStartFinished(cacheDesc); - } - - /** * @return {@code True} if topology has changed. */ public boolean topologyChanged() { @@ -579,11 +588,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return Exchange future. */ GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId, - @Nullable DiscoveryEvent discoEvt) { + @Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) { GridDhtPartitionsExchangeFuture<K, V> fut; GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx( - fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId)); + fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc)); if (old != null) fut = old; @@ -606,6 +615,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana fut.cleanUp(); } } + + DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor(); + + if (desc != null) + cctx.cache().onCacheStartFinished(desc); } /** @@ -654,7 +668,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana refreshPartitions(); } else - exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg); + exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); } finally { leaveBusy(); @@ -692,7 +706,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana scheduleResendPartitions(); } else - exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg); + exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 069930e..a22c9a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1170,6 +1170,30 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param desc Descriptor to check. + * @return {@code True} if cache was registered for start and exchange future should be created. + */ + public boolean dynamicCacheRegistered(DynamicCacheDescriptor desc) { + return dynamicCaches.get(desc.cacheConfiguration().getName()) == desc; + } + + /** + * @param startDesc Start descriptor. + */ + public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws IgniteCheckedException { + CacheConfiguration cfg = new CacheConfiguration(startDesc.cacheConfiguration()); + + initialize(cfg); + + GridCacheContext cacheCtx = createCache(cfg); + + sharedCtx.addCacheContext(cacheCtx); + + startCache(cacheCtx.cache()); + onKernalStart(cacheCtx.cache()); + } + + /** * Callback invoked when first exchange future for dynamic cache is completed. * * @param startDesc Cache start descriptor. @@ -1256,8 +1280,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId())); try { - byte[] filterBytes = ctx.config().getMarshaller().marshal(nodeFilter); - for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) { if (ccfg0.getName().equals(ccfg.getName())) return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + @@ -1274,11 +1296,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + "(a cache with the same name is already started): " + ccfg.getName())); - ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, filterBytes, fut.startId())); + ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, nodeFilter, fut.startId())); return fut; } - catch (IgniteCheckedException e) { + catch (Exception e) { fut.onDone(e); // Safety. @@ -1315,11 +1337,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc); - assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; - - // TODO IGNITE-45 create cache context here. + ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter()); - sharedCtx.exchange().onCacheDeployed(startDesc); + assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index e133a17..aadb153 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -115,6 +115,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Cache processor. + */ + public GridCacheProcessor cache() { + return kernalCtx.cache(); + } + + /** * Adds cache context to shared cache context. * * @param cacheCtx Cache context to add. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 4af7534..e86996d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -209,7 +209,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; - if (!exchId.isJoined()) + if (exchId.isLeft()) removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index d101efd..1145bdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -28,6 +29,7 @@ import java.nio.*; import java.util.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; /** * Exchange ID. @@ -54,7 +56,8 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa */ public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) { assert nodeId != null; - assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED; + assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED || + evt == EVT_DISCOVERY_CUSTOM_EVT; assert topVer.topologyVersion() > 0; this.nodeId = nodeId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 81ab4bf..de87904 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -142,6 +142,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff /** Logger. */ private IgniteLogger log; + /** Dynamic cache start descriptor. */ + private DynamicCacheDescriptor startDesc; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -197,8 +200,12 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff * @param busyLock Busy lock. * @param exchId Exchange ID. */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, ReadWriteLock busyLock, - GridDhtPartitionExchangeId exchId) { + public GridDhtPartitionsExchangeFuture( + GridCacheSharedContext<K, V> cctx, + ReadWriteLock busyLock, + GridDhtPartitionExchangeId exchId, + DynamicCacheDescriptor startDesc + ) { super(cctx.kernalContext()); syncNotify(true); @@ -213,6 +220,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff this.cctx = cctx; this.busyLock = busyLock; this.exchId = exchId; + this.startDesc = startDesc; log = cctx.logger(getClass()); @@ -379,6 +387,13 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } /** + * @return Dynamic cache descriptor. + */ + public DynamicCacheDescriptor dynamicCacheDescriptor() { + return startDesc; + } + + /** * @return Init future. */ IgniteInternalFuture<?> initFuture() { @@ -422,6 +437,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff // will return corresponding nodes. U.await(evtLatch); + if (startDesc != null) + startCache(); + assert discoEvt != null; assert exchId.nodeId().equals(discoEvt.eventNode().id()); @@ -433,7 +451,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } // Grab all alive remote nodes with order of equal or less than last joined node. - rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion().topologyVersion())); + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, + exchId.topologyVersion().topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -547,6 +566,15 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } /** + * Starts dynamic cache. + */ + private void startCache() throws IgniteCheckedException { + assert startDesc != null; + + ctx.cache().onCacheStartExchange(startDesc); + } + + /** * @param node Node. * @param id ID. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 34995ba..81b128e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -4481,6 +4481,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } msg.verify(getLocalNodeId()); + msg.topologyVersion(ring.topologyVersion()); } if (msg.verified()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 5d515e3..efb5db2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -136,4 +136,17 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertEquals(1, succeeded); assertEquals(threadNum - 1, failed); } + + /** + * @throws Exception If failed. + */ + public void testStartCacheSimple() throws Exception { + final IgniteKernal kernal = (IgniteKernal)grid(0); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("TestCacheName3"); + + kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); + } }