IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03758eaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03758eaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03758eaa Branch: refs/heads/ignite-45 Commit: 03758eaabdaf3b0cb69016fac66ec85f95d06a60 Parents: 32e26d3 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Mar 4 17:41:54 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Mar 4 17:41:54 2015 -0800 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 11 +- .../cache/DynamicCacheChangeBatch.java | 45 +++ .../cache/DynamicCacheChangeRequest.java | 120 ++++++++ .../cache/DynamicCacheDescriptor.java | 31 +- .../processors/cache/GridCacheContext.java | 19 +- .../processors/cache/GridCacheGateway.java | 16 ++ .../processors/cache/GridCacheIoManager.java | 16 ++ .../GridCachePartitionExchangeManager.java | 26 +- .../processors/cache/GridCacheProcessor.java | 281 ++++++++++++++----- .../cache/GridCacheSharedContext.java | 7 +- .../processors/cache/IgniteCacheProxy.java | 7 + .../GridDhtPartitionsExchangeFuture.java | 54 ++-- .../cache/IgniteDynamicCacheStartSelfTest.java | 87 +++++- 13 files changed, 597 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index d891149..04ca3d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -218,7 +218,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Adds dynamic cache filters. + * Adds dynamic cache filter. * * @param cacheName Cache name. * @param filter Cache filter. @@ -229,6 +229,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert old == null; } + /** + * Removes dynamic cache filter. + * + * @param cacheName Cache name. + */ + public void removeDynamicCacheFilter(String cacheName) { + dynamicCacheFilters.remove(cacheName); + } + /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { discoOrdered = discoOrdered(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java new file mode 100644 index 0000000..d657707 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.*; +import java.util.*; + +/** + * Cache change batch. + */ +public class DynamicCacheChangeBatch implements Serializable { + /** Change requests. */ + private Collection<DynamicCacheChangeRequest> reqs; + + /** + * @param reqs Requests. + */ + public DynamicCacheChangeBatch( + Collection<DynamicCacheChangeRequest> reqs + ) { + this.reqs = reqs; + } + + /** + * @return Collection of change requests. + */ + public Collection<DynamicCacheChangeRequest> requests() { + return reqs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java new file mode 100644 index 0000000..2a61200 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.io.*; + +/** + * Cache start/stop request. + */ +public class DynamicCacheChangeRequest implements Serializable { + /** Start ID. */ + private IgniteUuid deploymentId; + + /** Stop cache name. */ + @GridToStringExclude + private final String stopName; + + /** Cache start configuration. */ + private final CacheConfiguration startCfg; + + /** Cache start node filter. */ + private final IgnitePredicate<ClusterNode> startNodeFltr; + + /** + * Constructor creates cache start request. + * + * @param startCfg Start cache configuration. + * @param startNodeFltr Start node filter. + */ + public DynamicCacheChangeRequest( + CacheConfiguration startCfg, + IgnitePredicate<ClusterNode> startNodeFltr + ) { + this.startCfg = startCfg; + this.startNodeFltr = startNodeFltr; + + deploymentId = IgniteUuid.randomUuid(); + stopName = null; + } + + /** + * Constructor creates cache stop request. + * + * @param stopName Cache stop name. + */ + public DynamicCacheChangeRequest(String stopName) { + this.stopName = stopName; + + startCfg = null; + startNodeFltr = null; + } + + /** + * @return Deployment ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** + * @param deploymentId Deployment ID. + */ + public void deploymentId(IgniteUuid deploymentId) { + this.deploymentId = deploymentId; + } + + /** + * @return {@code True} if this is a start request. + */ + public boolean isStart() { + return startCfg != null; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return stopName != null ? stopName : startCfg.getName(); + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration startCacheConfiguration() { + return startCfg; + } + + /** + * @return Node filter. + */ + public IgnitePredicate<ClusterNode> startNodeFilter() { + return startNodeFltr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 6a6e227..e7f7e2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -23,14 +23,12 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import java.io.*; - /** * Cache start descriptor. */ -public class DynamicCacheDescriptor implements Serializable { +public class DynamicCacheDescriptor { /** Cache start ID. */ - private IgniteUuid startId; + private IgniteUuid deploymentId; /** Cache configuration. */ @GridToStringExclude @@ -40,21 +38,24 @@ public class DynamicCacheDescriptor implements Serializable { @GridToStringExclude private IgnitePredicate<ClusterNode> nodeFilter; + /** Cancelled flag. */ + private boolean cancelled; + /** * @param cacheCfg Cache configuration. * @param nodeFilter Node filter. */ - public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) { + public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid deploymentId) { this.cacheCfg = cacheCfg; this.nodeFilter = nodeFilter; - this.startId = startId; + this.deploymentId = deploymentId; } /** * @return Start ID. */ - public IgniteUuid startId() { - return startId; + public IgniteUuid deploymentId() { + return deploymentId; } /** @@ -71,6 +72,20 @@ public class DynamicCacheDescriptor implements Serializable { return nodeFilter; } + /** + * Sets cancelled flag. + */ + public void onCancelled() { + cancelled = true; + } + + /** + * @return Cancelled flag. + */ + public boolean cancelled() { + return cancelled; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", cacheCfg.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 159fd1b..557b6e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -205,6 +205,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Conflict resolver. */ private GridCacheVersionAbstractConflictResolver conflictRslvr; + /** Dynamic cache deployment ID. */ + private IgniteUuid dynamicDeploymentId; + /** * Empty constructor required for {@link Externalizable}. */ @@ -328,6 +331,20 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @param dynamicDeploymentId Dynamic deployment ID. + */ + void dynamicDeploymentId(IgniteUuid dynamicDeploymentId) { + this.dynamicDeploymentId = dynamicDeploymentId; + } + + /** + * @return Dynamic deployment ID. + */ + public IgniteUuid dynamicDeploymentId() { + return dynamicDeploymentId; + } + + /** * Initialize conflict resolver after all managers are started. */ void initConflictResolver() { @@ -1033,7 +1050,7 @@ public class GridCacheContext<K, V> implements Externalizable { * Sets default affinity key mapper. */ public void defaultAffMapper(CacheAffinityKeyMapper dfltAffMapper) { - this.affMapper = dfltAffMapper; + affMapper = dfltAffMapper; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 2de235a..8969471 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -30,6 +30,9 @@ public class GridCacheGateway<K, V> { /** Context. */ private final GridCacheContext<K, V> ctx; + /** Stopped flag for dynamic caches. */ + private volatile boolean stopped; + /** * @param ctx Cache context. */ @@ -49,6 +52,9 @@ public class GridCacheGateway<K, V> { // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. try { + if (stopped) + throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name()); + ctx.kernalContext().gateway().readLock(); } catch (IllegalStateException e) { @@ -110,6 +116,9 @@ public class GridCacheGateway<K, V> { // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. try { + if (stopped) + throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name()); + ctx.kernalContext().gateway().readLock(); // Set thread local projection per call. @@ -152,4 +161,11 @@ public class GridCacheGateway<K, V> { ctx.kernalContext().gateway().readUnlock(); } } + + /** + * + */ + public void onStopped() { + stopped = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b504e21..7a3ada7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -599,6 +599,22 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } /** + * @param cacheId Cache ID to remove handlers for. + */ + public void removeHandlers(int cacheId) { + assert cacheId != 0; + + idxClsHandlers.remove(cacheId); + + for (Iterator<ListenerKey> iterator = clsHandlers.keySet().iterator(); iterator.hasNext(); ) { + ListenerKey key = iterator.next(); + + if (key.cacheId == cacheId) + iterator.remove(); + } + } + + /** * @param lsnr Listener to add. */ public void addDisconnectListener(GridDisconnectListener lsnr) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d38161e..f7f1f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -142,16 +142,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else { DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; - if (customEvt.data() instanceof DynamicCacheDescriptor) { - DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data(); + if (customEvt.data() instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data(); - // Check if this event should trigger partition exchange. - if (cctx.cache().dynamicCacheRegistered(desc)) { + Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); + + // Validate requests to check if event should trigger partition exchange. + for (DynamicCacheChangeRequest req : batch.requests()) { + if (cctx.cache().dynamicCacheRegistered(req)) + valid.add(req); + } + + if (!F.isEmpty(valid)) { exchId = exchangeId(n.id(), new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer), e.type()); - exchFut = exchangeFuture(exchId, e, desc); + exchFut = exchangeFuture(exchId, e, valid); } } } @@ -588,11 +595,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return Exchange future. */ GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId, - @Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) { + @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs) { GridDhtPartitionsExchangeFuture<K, V> fut; GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx( - fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc)); + fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, reqs)); if (old != null) fut = old; @@ -615,11 +622,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana fut.cleanUp(); } } - - DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor(); - - if (desc != null) - cctx.cache().onCacheStartFinished(desc); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9a8cbcb..b7ac0af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -25,10 +25,7 @@ import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.events.*; -import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -109,7 +106,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { private IgniteTransactionsImpl transactions; /** Pending cache starts. */ - private ConcurrentMap<String, IgniteInternalFuture> pendingStarts = new ConcurrentHashMap<>(); + private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>(); /** Dynamic caches. */ private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>(); @@ -567,8 +564,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { @Override public void apply(Serializable evt) { - if (evt instanceof DynamicCacheDescriptor) - onCacheStartRequested((DynamicCacheDescriptor)evt); + if (evt instanceof DynamicCacheChangeBatch) + onCacheChangeRequested((DynamicCacheChangeBatch)evt); } }); @@ -1170,23 +1167,29 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param desc Descriptor to check. - * @return {@code True} if cache was registered for start and exchange future should be created. + * @param req Request to check. + * @return {@code True} if change request was registered to apply. */ - public boolean dynamicCacheRegistered(DynamicCacheDescriptor desc) { - return dynamicCaches.get(desc.cacheConfiguration().getName()) == desc; + public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { + DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); + + return desc != null && desc.deploymentId().equals(req.deploymentId()) && desc.cancelled() != req.isStart(); } /** - * @param startDesc Start descriptor. + * @param req Start request. */ - public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws IgniteCheckedException { - CacheConfiguration cfg = new CacheConfiguration(startDesc.cacheConfiguration()); + public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { + assert req.isStart(); + + CacheConfiguration cfg = new CacheConfiguration(req.startCacheConfiguration()); initialize(cfg); GridCacheContext cacheCtx = createCache(cfg); + cacheCtx.dynamicDeploymentId(req.deploymentId()); + sharedCtx.addCacheContext(cacheCtx); startCache(cacheCtx.cache()); @@ -1196,26 +1199,58 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param req Stop request. + */ + public void prepareCacheStop(DynamicCacheChangeRequest req) { + assert !req.isStart(); + + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName()); + + if (proxy != null) + proxy.gate().onStopped(); + + GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName()); + + if (cache != null) { + GridCacheContext<?, ?> ctx = cache.context(); + + sharedCtx.removeCacheContext(ctx); + + assert req.deploymentId().equals(ctx.dynamicDeploymentId()); + + onKernalStop(cache, true); + stopCache(cache, true); + } + } + + /** * Callback invoked when first exchange future for dynamic cache is completed. * - * @param startDesc Cache start descriptor. + * @param req Change request. */ @SuppressWarnings("unchecked") - public void onCacheStartFinished(DynamicCacheDescriptor startDesc) { - GridCacheAdapter<?, ?> cache = caches.get(startDesc.cacheConfiguration().getName()); + public void onExchangeDone(DynamicCacheChangeRequest req) { + if (req.isStart()) { + GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); - if (cache != null) - jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + if (cache != null) + jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + } + else { + DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); - CacheConfiguration ccfg = startDesc.cacheConfiguration(); + if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) + dynamicCaches.remove(req.cacheName(), desc); + } - DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.cacheName()); - if (fut != null && fut.startId().equals(startDesc.startId())) { - fut.onDone(); + assert req.deploymentId() != null; + assert fut == null || fut.deploymentId != null; - pendingStarts.remove(ccfg.getName(), fut); - } + if (fut != null && fut.deploymentId().equals(req.deploymentId())) + fut.onDone(); } /** @@ -1285,69 +1320,162 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (nodeFilter == null) nodeFilter = F.alwaysTrue(); - DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId())); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ccfg, nodeFilter); - try { - for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) { - if (ccfg0.getName().equals(ccfg.getName())) - return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already configured): " + ccfg.getName())); - } + return F.first(initiateCacheChanges(F.asList(req))); + } - if (caches.containsKey(ccfg.getName())) - return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + ccfg.getName())); + /** + * @param cacheName Cache name to stop. + * @return Future that will be completed when cache is stopped. + */ + public IgniteInternalFuture<?> dynamicStopCache(String cacheName) { + return F.first(initiateCacheChanges(F.asList(new DynamicCacheChangeRequest(cacheName)))); + } - IgniteInternalFuture<?> old = pendingStarts.putIfAbsent(ccfg.getName(), fut); + /** + * @param reqs Requests. + * @return Collection of futures. + */ + public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) { + Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size()); - if (old != null) - return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + ccfg.getName())); + Collection<DynamicCacheChangeRequest> sendReqs = new ArrayList<>(reqs.size()); - ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, nodeFilter, fut.startId())); + for (DynamicCacheChangeRequest req : reqs) { + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, req.cacheName(), req.deploymentId()); - return fut; - } - catch (Exception e) { - fut.onDone(e); + try { + for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) { + if (ccfg0.getName().equals(req.cacheName())) { + Exception ex = new IgniteCheckedException("Failed to " + + (req.isStart() ? "start" : "stop") + " cache " + + "(a cache with the same name is manually configured): " + ccfg0.getName()); + + fut.onDone(ex); + + break; + } + } + + if (fut.isDone()) + continue; - // Safety. - pendingStarts.remove(ccfg.getName(), fut); + if (req.isStart()) { + if (caches.containsKey(req.cacheName())) { + fut.onDone(new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + req.cacheName()))); + } + } + else { + GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); - return fut; + if (cache == null) + // No-op. + fut.onDone(); + else { + IgniteUuid dynamicDeploymentId = cache.context().dynamicDeploymentId(); + + assert dynamicDeploymentId != null; + + // Save deployment ID to avoid concurrent stops. + req.deploymentId(dynamicDeploymentId); + fut.deploymentId = dynamicDeploymentId; + } + } + + if (fut.isDone()) + continue; + + DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(req.cacheName(), fut); + + if (old != null) { + if (req.isStart()) { + fut.onDone(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already being started or stopped): " + req.cacheName())); + } + else { + fut = old; + + continue; + } + } + + if (fut.isDone()) + continue; + + sendReqs.add(req); + } + catch (Exception e) { + fut.onDone(e); + } + finally { + res.add(fut); + } } + + ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sendReqs)); + + return res; } /** * Callback invoked from discovery thread when cache deployment request is received. * - * @param startDesc Cache start descriptor. + * @param batch Change request batch. */ - private void onCacheStartRequested(DynamicCacheDescriptor startDesc) { - CacheConfiguration ccfg = startDesc.cacheConfiguration(); + private void onCacheChangeRequested(DynamicCacheChangeBatch batch) { + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.isStart()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); - // Check if cache with the same name was concurrently started form different node. - if (dynamicCaches.containsKey(ccfg.getName())) { - // If local node initiated start, fail the start future. - DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + // Check if cache with the same name was concurrently started form different node. + if (dynamicCaches.containsKey(ccfg.getName())) { + // If local node initiated start, fail the start future. + DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(ccfg.getName()); - if (startFut != null && startFut.startId().equals(startDesc.startId())) { - assert !startFut.syncNotify(); + if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { + assert !startFut.syncNotify(); - startFut.onDone(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + ccfg.getName())); + startFut.onDone(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + ccfg.getName())); + } - pendingStarts.remove(ccfg.getName(), startFut); + return; + } + + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.startNodeFilter(), + req.deploymentId()); + + DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc); + + ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter()); + + assert old == null : + "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; } + else { + DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); - return; - } + if (desc == null) { + // If local node initiated start, fail the start future. + DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(req.cacheName()); + + if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) { + assert !changeFut.syncNotify(); - DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc); + // No-op. + changeFut.onDone(); + } + + return; + } - ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter()); + desc.onCancelled(); - assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; + ctx.discovery().removeDynamicCacheFilter(req.cacheName()); + } + } } /** @@ -2028,25 +2156,40 @@ public class GridCacheProcessor extends GridProcessorAdapter { * */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - private static class DynamicCacheStartFuture extends GridFutureAdapter<Object> { + public class DynamicCacheStartFuture extends GridFutureAdapter<Object> { /** Start ID. */ - private IgniteUuid startId; + private IgniteUuid deploymentId; + + /** Cache name. */ + private String cacheName; /** * @param ctx Kernal context. */ - private DynamicCacheStartFuture(GridKernalContext ctx, IgniteUuid startId) { + private DynamicCacheStartFuture(GridKernalContext ctx, String cacheName, IgniteUuid deploymentId) { // Start future can be completed from discovery thread, notification must NOT be sync. super(ctx, false); - this.startId = startId; + this.deploymentId = deploymentId; + this.cacheName = cacheName; } /** * @return Start ID. */ - private IgniteUuid startId() { - return startId; + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + pendingFuts.remove(cacheName, this); + + return true; + } + + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index aadb153..ef00131 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -143,7 +143,12 @@ public class GridCacheSharedContext<K, V> { * @param cacheCtx Cache context to remove. */ public void removeCacheContext(GridCacheContext cacheCtx) { - ctxMap.remove(cacheCtx.cacheId(), cacheCtx); + int cacheId = cacheCtx.cacheId(); + + ctxMap.remove(cacheId, cacheCtx); + + // Safely clean up the message listeners. + ioMgr.removeHandlers(cacheId); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b77b8db..cddae65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -102,6 +102,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return ctx; } + /** + * @return Gateway. + */ + public GridCacheGateway<K, V> gate() { + return gate; + } + /** {@inheritDoc} */ @Override public CacheMetrics metrics() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4741bf4..5a66b21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -142,8 +142,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff /** Logger. */ private IgniteLogger log; - /** Dynamic cache start descriptor. */ - private DynamicCacheDescriptor startDesc; + /** Dynamic cache change requests. */ + private Collection<DynamicCacheChangeRequest> reqs; /** * Dummy future created to trigger reassignments if partition @@ -204,7 +204,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff GridCacheSharedContext<K, V> cctx, ReadWriteLock busyLock, GridDhtPartitionExchangeId exchId, - DynamicCacheDescriptor startDesc + Collection<DynamicCacheChangeRequest> reqs ) { super(cctx.kernalContext()); @@ -220,7 +220,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff this.cctx = cctx; this.busyLock = busyLock; this.exchId = exchId; - this.startDesc = startDesc; + this.reqs = reqs; log = cctx.logger(getClass()); @@ -392,13 +392,6 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } /** - * @return Dynamic cache descriptor. - */ - public DynamicCacheDescriptor dynamicCacheDescriptor() { - return startDesc; - } - - /** * @return Init future. */ IgniteInternalFuture<?> initFuture() { @@ -442,8 +435,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff // will return corresponding nodes. U.await(evtLatch); - if (startDesc != null) - startCache(); + if (!F.isEmpty(reqs)) + startCaches(); assert discoEvt != null; @@ -494,6 +487,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff if (log.isDebugEnabled()) log.debug("After waiting for partition release future: " + this); + if (!F.isEmpty(reqs)) + stopCaches(); + for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -571,12 +567,23 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } /** - * Starts dynamic cache. + * Starts dynamic caches. */ - private void startCache() throws IgniteCheckedException { - assert startDesc != null; + private void startCaches() throws IgniteCheckedException { + for (DynamicCacheChangeRequest req : reqs) { + if (req.isStart()) + ctx.cache().prepareCacheStart(req); + } + } - ctx.cache().onCacheStartExchange(startDesc); + /** + * Stop dynamic caches. + */ + private void stopCaches() { + for (DynamicCacheChangeRequest req : reqs) { + if (!req.isStart()) + ctx.cache().prepareCacheStop(req); + } } /** @@ -673,8 +680,17 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10); } - if (startDesc != null && F.eq(startDesc.cacheConfiguration().getName(), cacheCtx.name())) - cacheCtx.preloader().onInitialExchangeComplete(err); + if (!F.isEmpty(reqs)) { + for (DynamicCacheChangeRequest req : reqs) { + if (req.isStart() && F.eq(cacheCtx.name(), req.cacheName())) + cacheCtx.preloader().onInitialExchangeComplete(err); + } + } + } + + if (!F.isEmpty(reqs)) { + for (DynamicCacheChangeRequest req : reqs) + cctx.cache().onExchangeDone(req); } cctx.exchange().onExchangeDone(this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 70af7c3..ac92e72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -32,7 +32,11 @@ import java.util.concurrent.*; /** * Test for dynamic cache start. */ +@SuppressWarnings("unchecked") public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "TestDynamicCache"; + /** * @return Number of nodes for this test. */ @@ -53,18 +57,18 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testStartCacheMultithreadedSameNode() throws Exception { - final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>(); - + public void testStartStopCacheMultithreadedSameNode() throws Exception { final IgniteKernal kernal = (IgniteKernal)grid(0); + final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>(); + int threadNum = 20; GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName("TestCacheName"); + ccfg.setName(CACHE_NAME); futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); @@ -92,6 +96,21 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertEquals(1, succeeded); assertEquals(threadNum - 1, failed); + + futs.clear(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME)); + + return null; + } + }, threadNum, "cache-stopper"); + + assertEquals(threadNum, futs.size()); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); } /** @@ -106,7 +125,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName("TestCacheName2"); + ccfg.setName(CACHE_NAME); IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); @@ -136,20 +155,35 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertEquals(1, succeeded); assertEquals(threadNum - 1, failed); + + futs.clear(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); + + futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME)); + + return null; + } + }, threadNum, "cache-stopper"); + + assertEquals(threadNum, futs.size()); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); } /** * @throws Exception If failed. */ - public void testStartCacheSimple() throws Exception { + public void testStartStopCacheSimple() throws Exception { final IgniteKernal kernal = (IgniteKernal)grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - String cacheName = "TestCacheName3"; - - ccfg.setName(cacheName); + ccfg.setName(CACHE_NAME); kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); @@ -159,12 +193,41 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) f.get(); - assertNotNull(grid(g).jcache(cacheName)); + assertNotNull(grid(g).jcache(CACHE_NAME)); } - grid(0).jcache(cacheName).put("1", "1"); + grid(0).jcache(CACHE_NAME).put("1", "1"); + + for (int g = 0; g < nodeCount(); g++) + assertEquals("1", grid(g).jcache(CACHE_NAME).get("1")); + + // Grab caches before stop. + final IgniteCache[] caches = new IgniteCache[nodeCount()]; for (int g = 0; g < nodeCount(); g++) - assertEquals("1", grid(g).jcache(cacheName).get("1")); + caches[g] = grid(g).jcache(CACHE_NAME); + + kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); + + for (int g = 0; g < nodeCount(); g++) { + final IgniteKernal kernal0 = (IgniteKernal)grid(g); + + final int idx = g; + + for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) + f.get(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return kernal0.jcache(CACHE_NAME); + } + }, IllegalArgumentException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return caches[idx].get("1"); + } + }, IllegalStateException.class, null); + } } }