#IGNITE-45 - WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/893d0fe0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/893d0fe0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/893d0fe0 Branch: refs/heads/ignite-45 Commit: 893d0fe08ebe6502ac8efd772de3e5bc120be6f3 Parents: 3468369 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Mar 2 18:31:01 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Mar 2 18:31:01 2015 -0800 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContextImpl.java | 10 +- .../apache/ignite/internal/IgniteKernal.java | 8 +- .../discovery/GridDiscoveryManager.java | 37 +++-- .../cache/DynamicCacheDescriptor.java | 70 +++++++++ .../GridCachePartitionExchangeManager.java | 10 ++ .../processors/cache/GridCacheProcessor.java | 143 +++++++++++++++++++ .../spi/discovery/DiscoverySpiListener.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 55 +++---- .../cache/IgniteDynamicCacheStartSelfTest.java | 139 ++++++++++++++++++ 9 files changed, 438 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 756c16a..7fb080d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -381,6 +381,13 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param comp Manager to add. */ public void add(GridComponent comp) { + add(comp, true); + } + + /** + * @param comp Manager to add. + */ + public void add(GridComponent comp, boolean addToList) { assert comp != null; /* @@ -471,7 +478,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable else assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); - comps.add(comp); + if (addToList) + comps.add(comp); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f46d071..b17ed37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -740,6 +740,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ackSecurity(ctx); + // Assign discovery manager to context before other processors start so they + // are able to register custom event listener. + GridManager discoMgr = new GridDiscoveryManager(ctx); + + ctx.add(discoMgr, false); + // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. startProcessor(ctx, new GridClockSyncProcessor(ctx), attrs); @@ -776,7 +782,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { gw.setState(STARTED); // Start discovery manager last to make sure that grid is fully initialized. - startManager(ctx, new GridDiscoveryManager(ctx), attrs); + startManager(ctx, discoMgr, attrs); } finally { gw.writeUnlock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 68f0a4a..dce04e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -163,6 +163,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Metrics update worker. */ private final MetricsUpdater metricsUpdater = new MetricsUpdater(); + /** Custom event listener. */ + private GridPlainInClosure<Serializable> customEvtLsnr; + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -304,6 +307,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + try { + customEvtLsnr.apply(data); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + data, e); + } + } + if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) { boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer); @@ -380,6 +392,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param customEvtLsnr Custom event listener. + */ + public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) { + this.customEvtLsnr = customEvtLsnr; + } + + /** * @return Metrics. */ private GridLocalMetrics createMetrics() { @@ -1488,17 +1507,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { - DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); - - customEvt.node(ctx.discovery().localNode()); - customEvt.eventNode(node); - customEvt.type(type); - customEvt.topologySnapshot(topVer, null); - customEvt.data(evt.get5()); + if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) { + DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); - assert ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT); + customEvt.node(ctx.discovery().localNode()); + customEvt.eventNode(node); + customEvt.type(type); + customEvt.topologySnapshot(topVer, null); + customEvt.data(evt.get5()); - ctx.event().record(customEvt); + ctx.event().record(customEvt); + } return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java new file mode 100644 index 0000000..196730c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.io.*; + +/** + * Cache start descriptor. + */ +public class DynamicCacheDescriptor implements Serializable { + /** Cache configuration. */ + @GridToStringExclude + private CacheConfiguration cacheCfg; + + /** Deploy filter bytes. */ + @GridToStringExclude + private byte[] deployFltrBytes; + + /** Cache start ID. */ + private IgniteUuid startId; + + /** + * @param cacheCfg Cache configuration. + * @param deployFltrBytes Deployment filter bytes. + */ + public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) { + this.cacheCfg = cacheCfg; + this.deployFltrBytes = deployFltrBytes; + this.startId = startId; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration cacheConfiguration() { + return cacheCfg; + } + + /** + * @return Start ID. + */ + public IgniteUuid startId() { + return startId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DynamicCacheDescriptor.class, this, "cacheName", cacheCfg.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 246ff37..09edf52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -385,6 +385,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Callback to start exchange for dynamically started cache. + * + * @param cacheDesc Cache descriptor. + */ + public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) { + // TODO IGNITE-45 move to exchange future. + cctx.kernalContext().cache().onCacheStartFinished(cacheDesc); + } + + /** * @return {@code True} if topology has changed. */ public boolean topologyChanged() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f74f969..607204e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -44,8 +44,10 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -53,7 +55,9 @@ import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.integration.*; import javax.management.*; +import java.io.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; @@ -102,6 +106,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Transaction interface implementation. */ private IgniteTransactionsImpl transactions; + /** Pending cache starts. */ + private ConcurrentMap<String, IgniteInternalFuture> pendingStarts = new ConcurrentHashMap<>(); + + /** Dynamic caches. */ + private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>(); + /** * @param ctx Kernal context. */ @@ -558,6 +568,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { maxPreloadOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); + ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { + @Override public void apply(Serializable evt) { + if (evt instanceof DynamicCacheDescriptor) + onCacheDeploymentRequested((DynamicCacheDescriptor)evt); + } + }); + // Internal caches which should not be returned to user. IgfsConfiguration[] igfsCfgs = ctx.grid().configuration().getIgfsConfiguration(); @@ -916,6 +933,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Callback invoked when first exchange future for dynamic cache is completed. + * + * @param startDesc Cache start descriptor. + */ + public void onCacheStartFinished(DynamicCacheDescriptor startDesc) { + CacheConfiguration ccfg = startDesc.cacheConfiguration(); + + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + + if (fut != null && fut.startId().equals(startDesc.startId())) { + fut.onDone(); + + pendingStarts.remove(ccfg.getName(), fut); + } + } + + /** * Creates shared context. * * @param kernalCtx Kernal context. @@ -972,6 +1006,89 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Dynamically starts cache. + * + * @param ccfg Cache configuration. + * @param nodeFilter Node filter to select nodes on which the cache should be deployed. + * @return Future that will be completed when cache is deployed. + */ + public IgniteInternalFuture<?> startCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) { + if (nodeFilter == null) + nodeFilter = F.alwaysTrue(); + + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId())); + + try { + byte[] filterBytes = ctx.config().getMarshaller().marshal(nodeFilter); + + for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) { + if (ccfg0.getName().equals(ccfg.getName())) + return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already configured): " + ccfg.getName())); + } + + if (caches.containsKey(ccfg.getName())) + return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + ccfg.getName())); + + IgniteInternalFuture<?> old = pendingStarts.putIfAbsent(ccfg.getName(), fut); + + if (old != null) + return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + ccfg.getName())); + + ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, filterBytes, fut.startId())); + + return fut; + } + catch (IgniteCheckedException e) { + fut.onDone(e); + + // Safety. + pendingStarts.remove(ccfg.getName(), fut); + + return fut; + } + } + + /** + * Callback invoked from discovery thread when cache deployment request is received. + * + * @param startDesc Cache start descriptor. + */ + private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) { + // TODO IGNITE-45 remove debug + U.debug(log, "Received start notification: " + startDesc); + + CacheConfiguration ccfg = startDesc.cacheConfiguration(); + + // Check if cache with the same name was concurrently started form different node. + if (dynamicCaches.containsKey(ccfg.getName())) { + // If local node initiated start, fail the start future. + DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + + if (startFut != null && startFut.startId().equals(startDesc.startId())) { + assert !startFut.syncNotify(); + + startFut.onDone(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + ccfg.getName())); + + pendingStarts.remove(ccfg.getName(), startFut); + } + + return; + } + + DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc); + + assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; + + // TODO IGNITE-45 create cache context here. + + sharedCtx.exchange().onCacheDeployed(startDesc); + } + + /** * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode. * * @param cfgs Caches. @@ -1858,6 +1975,32 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + private static class DynamicCacheStartFuture extends GridFutureAdapter<Object> { + /** Start ID. */ + private IgniteUuid startId; + + /** + * @param ctx Kernal context. + */ + private DynamicCacheStartFuture(GridKernalContext ctx, IgniteUuid startId) { + // Start future can be completed from discovery thread, notification must NOT be sync. + super(ctx, false); + + this.startId = startId; + } + + /** + * @return Start ID. + */ + private IgniteUuid startId() { + return startId; + } + } + + /** + * + */ private static class LocalAffinityFunction implements CacheAffinityFunction { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 243aaeb..7f17fe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; +import org.apache.ignite.events.DiscoveryEvent; import org.jetbrains.annotations.*; import java.io.*; @@ -31,7 +32,7 @@ public interface DiscoverySpiListener { /** * Notification for grid node discovery events. * - * @param type Node discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} + * @param type Node discovery event type. See {@link DiscoveryEvent} * @param topVer Topology version or {@code 0} if configured discovery SPI implementation * does not support versioning. * @param node Node affected (e.g. newly joined node, left node, failed node or local node). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index df39d6b..3800783 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.*; import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.processors.security.*; @@ -35,7 +36,11 @@ import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -60,7 +65,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe * done across it. * <p> * At startup SPI tries to send messages to random IP taken from - * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} about self start (stops when send succeeds) + * {@link TcpDiscoveryIpFinder} about self start (stops when send succeeds) * and then this info goes to coordinator. When coordinator processes join request * and issues node added messages and all other nodes then receive info about new node. * <h1 class="header">Configuration</h1> @@ -70,14 +75,14 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe * The following configuration parameters are optional: * <ul> * <li>IP finder to share info about nodes IP addresses - * (see {@link #setIpFinder(org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}). + * (see {@link #setIpFinder(TcpDiscoveryIpFinder)}). * See the following IP finder implementations for details on configuration: * <ul> - * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li> + * <li>{@link TcpDiscoverySharedFsIpFinder}</li> * <li>{@ignitelink org.apache.ignite.spi.discovery.tcp.ipfinder.s3.TcpDiscoveryS3IpFinder}</li> - * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li> - * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li> - * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} - default</li> + * <li>{@link TcpDiscoveryJdbcIpFinder}</li> + * <li>{@link TcpDiscoveryVmIpFinder}</li> + * <li>{@link TcpDiscoveryMulticastIpFinder} - default</li> * </ul> * </li> * </ul> @@ -136,7 +141,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe * <img src="http://www.gridgain.com/images/spring-small.png"> * <br> * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - * @see org.apache.ignite.spi.discovery.DiscoverySpi + * @see DiscoverySpi */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") @IgniteSpiMultipleInstancesSupport(true) @@ -373,7 +378,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * * @param joinTimeout Join timeout ({@code 0} means wait forever). * - * @see org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared() + * @see TcpDiscoveryIpFinder#isShared() */ @IgniteSpiConfiguration(optional = true) public void setJoinTimeout(long joinTimeout) { @@ -659,7 +664,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Starts or restarts SPI after stop (to reconnect). * * @param restart {@code True} if SPI is restarted after stop. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws IgniteSpiException If failed. */ private void spiStart0(boolean restart) throws IgniteSpiException { if (!restart) @@ -772,7 +777,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws IgniteSpiException If failed. */ @SuppressWarnings("BusyWait") private void registerLocalNodeAddress() throws IgniteSpiException { @@ -803,7 +808,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws IgniteSpiException If failed. */ private void onSpiStart() throws IgniteSpiException { startStopwatch(); @@ -904,7 +909,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Stops SPI finally or stops SPI for restart. * * @param disconnect {@code True} if SPI is being disconnected. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. + * @throws IgniteSpiException If failed. */ private void spiStop0(boolean disconnect) throws IgniteSpiException { if (ctxInitLatch.getCount() > 0) @@ -1059,7 +1064,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + * @throws IgniteSpiException If any error occurs. * @return {@code true} if IP finder contains local address. */ private boolean ipFinderHasLocalAddress() throws IgniteSpiException { @@ -1075,8 +1080,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (resolved.equals(locAddr)) return true; } - catch (UnknownHostException ignored) { - onException(ignored.getMessage(), ignored); + catch (UnknownHostException e) { + onException(e.getMessage(), e); } } @@ -1153,7 +1158,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * * @param addr Address of the node. * @return ID of the remote node if node alive. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + * @throws IgniteSpiException If an error occurs. */ private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) throws IgniteCheckedException { @@ -1251,7 +1256,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * Tries to join this node to topology. * - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + * @throws IgniteSpiException If any error occurs. */ private void joinTopology() throws IgniteSpiException { synchronized (mux) { @@ -1378,11 +1383,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * Tries to send join request message to a random node presenting in topology. - * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is + * Address is provided by {@link TcpDiscoveryIpFinder} and message is * sent to first node connection succeeded to. * * @return {@code true} if send succeeded. - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + * @throws IgniteSpiException If any error occurs. */ @SuppressWarnings({"BusyWait"}) private boolean sendJoinRequestMessage() throws IgniteSpiException { @@ -1726,7 +1731,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * Notify external listener on discovery event. * - * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details. + * @param type Discovery event type. See {@link DiscoveryEvent} for more details. * @param topVer Topology version. * @param node Remote node this event is connected with. */ @@ -1743,7 +1748,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - Collection<ClusterNode> top = F.upcast(ring.visibleNodes()); + Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); @@ -2237,7 +2242,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * Thread that sends status check messages to next node if local node has not - * been receiving heartbeats ({@link org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage}) + * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. */ @@ -2303,7 +2308,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * addresses of the nodes that has left the topology. * <p> * This thread should run only on coordinator node and will clean IP finder - * if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}. + * if and only if {@link TcpDiscoveryIpFinder#isShared()} is {@code true}. */ private class IpFinderCleaner extends IgniteSpiThread { /** @@ -3956,11 +3961,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov try { ipFinder.unregisterAddresses(leftNode.socketAddresses()); } - catch (IgniteSpiException ignored) { + catch (IgniteSpiException e) { if (log.isDebugEnabled()) log.debug("Failed to unregister left node address: " + leftNode); - onException("Failed to unregister left node address: " + leftNode, ignored); + onException("Failed to unregister left node address: " + leftNode, e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java new file mode 100644 index 0000000..dddf4a2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Test for dynamic cache start. + */ +public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { + /** + * @return Number of nodes for this test. + */ + public int nodeCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(nodeCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheMultithreadedSameNode() throws Exception { + final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>(); + + final IgniteKernal kernal = (IgniteKernal)grid(0); + + int threadNum = 20; + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("TestCacheName"); + + futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue())); + + return null; + } + }, threadNum, "cache-starter"); + + assertEquals(threadNum, futs.size()); + + int succeeded = 0; + int failed = 0; + + for (IgniteInternalFuture<?> fut : futs) { + try { + fut.get(); + + succeeded++; + } + catch (IgniteCheckedException e) { + info(e.getMessage()); + + failed++; + } + } + + assertEquals(1, succeeded); + assertEquals(threadNum - 1, failed); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheMultithreadedDifferentNodes() throws Exception { + final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>(); + + int threadNum = 20; + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("TestCacheName2"); + + IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); + + futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue())); + + return null; + } + }, threadNum, "cache-starter"); + + assertEquals(threadNum, futs.size()); + + int succeeded = 0; + int failed = 0; + + for (IgniteInternalFuture<?> fut : futs) { + try { + fut.get(); + + succeeded++; + } + catch (IgniteCheckedException e) { + info(e.getMessage()); + + failed++; + } + } + + assertEquals(1, succeeded); + assertEquals(threadNum - 1, failed); + } +}