#IGNITE-45 - WIP.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/893d0fe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/893d0fe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/893d0fe0

Branch: refs/heads/ignite-45
Commit: 893d0fe08ebe6502ac8efd772de3e5bc120be6f3
Parents: 3468369
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Mon Mar 2 18:31:01 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Mon Mar 2 18:31:01 2015 -0800

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContextImpl.java  |  10 +-
 .../apache/ignite/internal/IgniteKernal.java    |   8 +-
 .../discovery/GridDiscoveryManager.java         |  37 +++--
 .../cache/DynamicCacheDescriptor.java           |  70 +++++++++
 .../GridCachePartitionExchangeManager.java      |  10 ++
 .../processors/cache/GridCacheProcessor.java    | 143 +++++++++++++++++++
 .../spi/discovery/DiscoverySpiListener.java     |   3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  55 +++----
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 139 ++++++++++++++++++
 9 files changed, 438 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 756c16a..7fb080d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -381,6 +381,13 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
      * @param comp Manager to add.
      */
     public void add(GridComponent comp) {
+        add(comp, true);
+    }
+
+    /**
+     * @param comp Manager to add.
+     */
+    public void add(GridComponent comp, boolean addToList) {
         assert comp != null;
 
         /*
@@ -471,7 +478,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         else
             assert (comp instanceof GridPluginComponent) : "Unknown manager 
class: " + comp.getClass();
 
-        comps.add(comp);
+        if (addToList)
+            comps.add(comp);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f46d071..b17ed37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -740,6 +740,12 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
 
             ackSecurity(ctx);
 
+            // Assign discovery manager to context before other processors 
start so they
+            // are able to register custom event listener.
+            GridManager discoMgr = new GridDiscoveryManager(ctx);
+
+            ctx.add(discoMgr, false);
+
             // Start processors before discovery manager, so they will
             // be able to start receiving messages once discovery completes.
             startProcessor(ctx, new GridClockSyncProcessor(ctx), attrs);
@@ -776,7 +782,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 gw.setState(STARTED);
 
                 // Start discovery manager last to make sure that grid is 
fully initialized.
-                startManager(ctx, new GridDiscoveryManager(ctx), attrs);
+                startManager(ctx, discoMgr, attrs);
             }
             finally {
                 gw.writeUnlock();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 68f0a4a..dce04e2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -163,6 +163,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Metrics update worker. */
     private final MetricsUpdater metricsUpdater = new MetricsUpdater();
 
+    /** Custom event listener. */
+    private GridPlainInClosure<Serializable> customEvtLsnr;
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -304,6 +307,15 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     return;
                 }
 
+                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                    try {
+                        customEvtLsnr.apply(data);
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to notify direct custom event 
listener: " + data, e);
+                    }
+                }
+
                 if (topVer > 0 && (type == EVT_NODE_JOINED || type == 
EVT_NODE_FAILED || type == EVT_NODE_LEFT)) {
                     boolean set = 
GridDiscoveryManager.this.topVer.setIfGreater(topVer);
 
@@ -380,6 +392,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param customEvtLsnr Custom event listener.
+     */
+    public void setCustomEventListener(GridPlainInClosure<Serializable> 
customEvtLsnr) {
+        this.customEvtLsnr = customEvtLsnr;
+    }
+
+    /**
      * @return Metrics.
      */
     private GridLocalMetrics createMetrics() {
@@ -1488,17 +1507,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 }
 
                 case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                    DiscoveryCustomEvent customEvt = new 
DiscoveryCustomEvent();
-
-                    customEvt.node(ctx.discovery().localNode());
-                    customEvt.eventNode(node);
-                    customEvt.type(type);
-                    customEvt.topologySnapshot(topVer, null);
-                    customEvt.data(evt.get5());
+                    if 
(ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
+                        DiscoveryCustomEvent customEvt = new 
DiscoveryCustomEvent();
 
-                    assert 
ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT);
+                        customEvt.node(ctx.discovery().localNode());
+                        customEvt.eventNode(node);
+                        customEvt.type(type);
+                        customEvt.topologySnapshot(topVer, null);
+                        customEvt.data(evt.get5());
 
-                    ctx.event().record(customEvt);
+                        ctx.event().record(customEvt);
+                    }
 
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
new file mode 100644
index 0000000..196730c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+
+/**
+ * Cache start descriptor.
+ */
+public class DynamicCacheDescriptor implements Serializable {
+    /** Cache configuration. */
+    @GridToStringExclude
+    private CacheConfiguration cacheCfg;
+
+    /** Deploy filter bytes. */
+    @GridToStringExclude
+    private byte[] deployFltrBytes;
+
+    /** Cache start ID. */
+    private IgniteUuid startId;
+
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param deployFltrBytes Deployment filter bytes.
+     */
+    public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] 
deployFltrBytes, IgniteUuid startId) {
+        this.cacheCfg = cacheCfg;
+        this.deployFltrBytes = deployFltrBytes;
+        this.startId = startId;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration cacheConfiguration() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Start ID.
+     */
+    public IgniteUuid startId() {
+        return startId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DynamicCacheDescriptor.class, this, "cacheName", 
cacheCfg.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 246ff37..09edf52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -385,6 +385,16 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * Callback to start exchange for dynamically started cache.
+     *
+     * @param cacheDesc Cache descriptor.
+     */
+    public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) {
+        // TODO IGNITE-45 move to exchange future.
+        cctx.kernalContext().cache().onCacheStartFinished(cacheDesc);
+    }
+
+    /**
      * @return {@code True} if topology has changed.
      */
     public boolean topologyChanged() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f74f969..607204e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -44,8 +44,10 @@ import 
org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
@@ -53,7 +55,9 @@ import org.jetbrains.annotations.*;
 import javax.cache.configuration.*;
 import javax.cache.integration.*;
 import javax.management.*;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
@@ -102,6 +106,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Transaction interface implementation. */
     private IgniteTransactionsImpl transactions;
 
+    /** Pending cache starts. */
+    private ConcurrentMap<String, IgniteInternalFuture> pendingStarts = new 
ConcurrentHashMap<>();
+
+    /** Dynamic caches. */
+    private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new 
ConcurrentHashMap<>();
+
     /**
      * @param ctx Kernal context.
      */
@@ -558,6 +568,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         maxPreloadOrder = 
validatePreloadOrder(ctx.config().getCacheConfiguration());
 
+        ctx.discovery().setCustomEventListener(new 
GridPlainInClosure<Serializable>() {
+            @Override public void apply(Serializable evt) {
+                if (evt instanceof DynamicCacheDescriptor)
+                    onCacheDeploymentRequested((DynamicCacheDescriptor)evt);
+            }
+        });
+
         // Internal caches which should not be returned to user.
         IgfsConfiguration[] igfsCfgs = 
ctx.grid().configuration().getIgfsConfiguration();
 
@@ -916,6 +933,23 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Callback invoked when first exchange future for dynamic cache is 
completed.
+     *
+     * @param startDesc Cache start descriptor.
+     */
+    public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
+        CacheConfiguration ccfg = startDesc.cacheConfiguration();
+
+        DynamicCacheStartFuture fut = 
(DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+
+        if (fut != null && fut.startId().equals(startDesc.startId())) {
+            fut.onDone();
+
+            pendingStarts.remove(ccfg.getName(), fut);
+        }
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -972,6 +1006,89 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Dynamically starts cache.
+     *
+     * @param ccfg Cache configuration.
+     * @param nodeFilter Node filter to select nodes on which the cache should 
be deployed.
+     * @return Future that will be completed when cache is deployed.
+     */
+    public IgniteInternalFuture<?> startCache(CacheConfiguration ccfg, 
IgnitePredicate<ClusterNode> nodeFilter) {
+        if (nodeFilter == null)
+            nodeFilter = F.alwaysTrue();
+
+        DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, 
IgniteUuid.fromUuid(ctx.localNodeId()));
+
+        try {
+            byte[] filterBytes = 
ctx.config().getMarshaller().marshal(nodeFilter);
+
+            for (CacheConfiguration ccfg0 : 
ctx.config().getCacheConfiguration()) {
+                if (ccfg0.getName().equals(ccfg.getName()))
+                    return new GridFinishedFutureEx<>(new 
IgniteCheckedException("Failed to start cache " +
+                        "(a cache with the same name is already configured): " 
+ ccfg.getName()));
+            }
+
+            if (caches.containsKey(ccfg.getName()))
+                return new GridFinishedFutureEx<>(new 
IgniteCheckedException("Failed to start cache " +
+                    "(a cache with the same name is already started): " + 
ccfg.getName()));
+
+            IgniteInternalFuture<?> old = 
pendingStarts.putIfAbsent(ccfg.getName(), fut);
+
+            if (old != null)
+                return new GridFinishedFutureEx<>(new 
IgniteCheckedException("Failed to start cache " +
+                    "(a cache with the same name is already started): " + 
ccfg.getName()));
+
+            ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, 
filterBytes, fut.startId()));
+
+            return fut;
+        }
+        catch (IgniteCheckedException e) {
+            fut.onDone(e);
+
+            // Safety.
+            pendingStarts.remove(ccfg.getName(), fut);
+
+            return fut;
+        }
+    }
+
+    /**
+     * Callback invoked from discovery thread when cache deployment request is 
received.
+     *
+     * @param startDesc Cache start descriptor.
+     */
+    private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) {
+        // TODO IGNITE-45 remove debug
+        U.debug(log, "Received start notification: " + startDesc);
+
+        CacheConfiguration ccfg = startDesc.cacheConfiguration();
+
+        // Check if cache with the same name was concurrently started form 
different node.
+        if (dynamicCaches.containsKey(ccfg.getName())) {
+            // If local node initiated start, fail the start future.
+            DynamicCacheStartFuture startFut = 
(DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+
+            if (startFut != null && 
startFut.startId().equals(startDesc.startId())) {
+                assert !startFut.syncNotify();
+
+                startFut.onDone(new IgniteCheckedException("Failed to start 
cache " +
+                        "(a cache with the same name is already started): " + 
ccfg.getName()));
+
+                pendingStarts.remove(ccfg.getName(), startFut);
+            }
+
+            return;
+        }
+
+        DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), 
startDesc);
+
+        assert old == null : "Dynamic cache map was concurrently modified 
[new=" + startDesc + ", old=" + old + ']';
+
+        // TODO IGNITE-45 create cache context here.
+
+        sharedCtx.exchange().onCacheDeployed(startDesc);
+    }
+
+    /**
      * Checks that preload-order-dependant caches has SYNC or ASYNC preloading 
mode.
      *
      * @param cfgs Caches.
@@ -1858,6 +1975,32 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /**
      *
      */
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+    private static class DynamicCacheStartFuture extends 
GridFutureAdapter<Object> {
+        /** Start ID. */
+        private IgniteUuid startId;
+
+        /**
+         * @param ctx Kernal context.
+         */
+        private DynamicCacheStartFuture(GridKernalContext ctx, IgniteUuid 
startId) {
+            // Start future can be completed from discovery thread, 
notification must NOT be sync.
+            super(ctx, false);
+
+            this.startId = startId;
+        }
+
+        /**
+         * @return Start ID.
+         */
+        private IgniteUuid startId() {
+            return startId;
+        }
+    }
+
+    /**
+     *
+     */
     private static class LocalAffinityFunction implements 
CacheAffinityFunction {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 243aaeb..7f17fe4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery;
 
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.DiscoveryEvent;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -31,7 +32,7 @@ public interface DiscoverySpiListener {
     /**
      * Notification for grid node discovery events.
      *
-     * @param type Node discovery event type. See {@link 
org.apache.ignite.events.DiscoveryEvent}
+     * @param type Node discovery event type. See {@link DiscoveryEvent}
      * @param topVer Topology version or {@code 0} if configured discovery SPI 
implementation
      *      does not support versioning.
      * @param node Node affected (e.g. newly joined node, left node, failed 
node or local node).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index df39d6b..3800783 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.events.*;
 import org.apache.ignite.internal.processors.security.*;
@@ -35,7 +36,11 @@ import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import 
org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
@@ -60,7 +65,7 @@ import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
  * done across it.
  * <p>
  * At startup SPI tries to send messages to random IP taken from
- * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} 
about self start (stops when send succeeds)
+ * {@link TcpDiscoveryIpFinder} about self start (stops when send succeeds)
  * and then this info goes to coordinator. When coordinator processes join 
request
  * and issues node added messages and all other nodes then receive info about 
new node.
  * <h1 class="header">Configuration</h1>
@@ -70,14 +75,14 @@ import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
  * The following configuration parameters are optional:
  * <ul>
  * <li>IP finder to share info about nodes IP addresses
- * (see {@link 
#setIpFinder(org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}).
+ * (see {@link #setIpFinder(TcpDiscoveryIpFinder)}).
  * See the following IP finder implementations for details on configuration:
  * <ul>
- * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li>
+ * <li>{@link TcpDiscoverySharedFsIpFinder}</li>
  * <li>{@ignitelink 
org.apache.ignite.spi.discovery.tcp.ipfinder.s3.TcpDiscoveryS3IpFinder}</li>
- * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li>
- * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li>
- * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder}
 - default</li>
+ * <li>{@link TcpDiscoveryJdbcIpFinder}</li>
+ * <li>{@link TcpDiscoveryVmIpFinder}</li>
+ * <li>{@link TcpDiscoveryMulticastIpFinder} - default</li>
  * </ul>
  * </li>
  * </ul>
@@ -136,7 +141,7 @@ import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
  * <img src="http://www.gridgain.com/images/spring-small.png";>
  * <br>
  * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
- * @see org.apache.ignite.spi.discovery.DiscoverySpi
+ * @see DiscoverySpi
  */
 @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
 @IgniteSpiMultipleInstancesSupport(true)
@@ -373,7 +378,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
      *
      * @param joinTimeout Join timeout ({@code 0} means wait forever).
      *
-     * @see 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()
+     * @see TcpDiscoveryIpFinder#isShared()
      */
     @IgniteSpiConfiguration(optional = true)
     public void setJoinTimeout(long joinTimeout) {
@@ -659,7 +664,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
      * Starts or restarts SPI after stop (to reconnect).
      *
      * @param restart {@code True} if SPI is restarted after stop.
-     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     * @throws IgniteSpiException If failed.
      */
     private void spiStart0(boolean restart) throws IgniteSpiException {
         if (!restart)
@@ -772,7 +777,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
     }
 
     /**
-     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     * @throws IgniteSpiException If failed.
      */
     @SuppressWarnings("BusyWait")
     private void registerLocalNodeAddress() throws IgniteSpiException {
@@ -803,7 +808,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
     }
 
     /**
-     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     * @throws IgniteSpiException If failed.
      */
     private void onSpiStart() throws IgniteSpiException {
         startStopwatch();
@@ -904,7 +909,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
      * Stops SPI finally or stops SPI for restart.
      *
      * @param disconnect {@code True} if SPI is being disconnected.
-     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     * @throws IgniteSpiException If failed.
      */
     private void spiStop0(boolean disconnect) throws IgniteSpiException {
         if (ctxInitLatch.getCount() > 0)
@@ -1059,7 +1064,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     }
 
     /**
-     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     * @throws IgniteSpiException If any error occurs.
      * @return {@code true} if IP finder contains local address.
      */
     private boolean ipFinderHasLocalAddress() throws IgniteSpiException {
@@ -1075,8 +1080,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     if (resolved.equals(locAddr))
                         return true;
                 }
-                catch (UnknownHostException ignored) {
-                    onException(ignored.getMessage(), ignored);
+                catch (UnknownHostException e) {
+                    onException(e.getMessage(), e);
                 }
         }
 
@@ -1153,7 +1158,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
      *
      * @param addr Address of the node.
      * @return ID of the remote node if node alive.
-     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     * @throws IgniteSpiException If an error occurs.
      */
     private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, 
@Nullable UUID clientNodeId)
         throws IgniteCheckedException {
@@ -1251,7 +1256,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     /**
      * Tries to join this node to topology.
      *
-     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     * @throws IgniteSpiException If any error occurs.
      */
     private void joinTopology() throws IgniteSpiException {
         synchronized (mux) {
@@ -1378,11 +1383,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
     /**
      * Tries to send join request message to a random node presenting in 
topology.
-     * Address is provided by {@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message 
is
+     * Address is provided by {@link TcpDiscoveryIpFinder} and message is
      * sent to first node connection succeeded to.
      *
      * @return {@code true} if send succeeded.
-     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     * @throws IgniteSpiException If any error occurs.
      */
     @SuppressWarnings({"BusyWait"})
     private boolean sendJoinRequestMessage() throws IgniteSpiException {
@@ -1726,7 +1731,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     /**
      * Notify external listener on discovery event.
      *
-     * @param type Discovery event type. See {@link 
org.apache.ignite.events.DiscoveryEvent} for more details.
+     * @param type Discovery event type. See {@link DiscoveryEvent} for more 
details.
      * @param topVer Topology version.
      * @param node Remote node this event is connected with.
      */
@@ -1743,7 +1748,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 log.debug("Discovery notification [node=" + node + ", 
spiState=" + spiState +
                     ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
 
-            Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
+            Collection<ClusterNode> top = F.<TcpDiscoveryNode, 
ClusterNode>upcast(ring.visibleNodes());
 
             Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
 
@@ -2237,7 +2242,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
     /**
      * Thread that sends status check messages to next node if local node has 
not
-     * been receiving heartbeats ({@link 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage})
+     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
      * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
      * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
      */
@@ -2303,7 +2308,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
      * addresses of the nodes that has left the topology.
      * <p>
      * This thread should run only on coordinator node and will clean IP finder
-     * if and only if {@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} 
is {@code true}.
+     * if and only if {@link TcpDiscoveryIpFinder#isShared()} is {@code true}.
      */
     private class IpFinderCleaner extends IgniteSpiThread {
         /**
@@ -3956,11 +3961,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                         try {
                             
ipFinder.unregisterAddresses(leftNode.socketAddresses());
                         }
-                        catch (IgniteSpiException ignored) {
+                        catch (IgniteSpiException e) {
                             if (log.isDebugEnabled())
                                 log.debug("Failed to unregister left node 
address: " + leftNode);
 
-                            onException("Failed to unregister left node 
address: " + leftNode, ignored);
+                            onException("Failed to unregister left node 
address: " + leftNode, e);
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/893d0fe0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
new file mode 100644
index 0000000..dddf4a2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Test for dynamic cache start.
+ */
+public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
+    /**
+     * @return Number of nodes for this test.
+     */
+    public int nodeCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(nodeCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheMultithreadedSameNode() throws Exception {
+        final Collection<IgniteInternalFuture<?>> futs = new 
ConcurrentLinkedDeque<>();
+
+        final IgniteKernal kernal = (IgniteKernal)grid(0);
+
+        int threadNum = 20;
+
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName("TestCacheName");
+
+                futs.add(kernal.context().cache().startCache(ccfg, 
F.<ClusterNode>alwaysTrue()));
+
+                return null;
+            }
+        }, threadNum, "cache-starter");
+
+        assertEquals(threadNum, futs.size());
+
+        int succeeded = 0;
+        int failed = 0;
+
+        for (IgniteInternalFuture<?> fut : futs) {
+            try {
+                fut.get();
+
+                succeeded++;
+            }
+            catch (IgniteCheckedException e) {
+                info(e.getMessage());
+
+                failed++;
+            }
+        }
+
+        assertEquals(1, succeeded);
+        assertEquals(threadNum - 1, failed);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheMultithreadedDifferentNodes() throws Exception {
+        final Collection<IgniteInternalFuture<?>> futs = new 
ConcurrentLinkedDeque<>();
+
+        int threadNum = 20;
+
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName("TestCacheName2");
+
+                IgniteKernal kernal = 
(IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
+
+                futs.add(kernal.context().cache().startCache(ccfg, 
F.<ClusterNode>alwaysTrue()));
+
+                return null;
+            }
+        }, threadNum, "cache-starter");
+
+        assertEquals(threadNum, futs.size());
+
+        int succeeded = 0;
+        int failed = 0;
+
+        for (IgniteInternalFuture<?> fut : futs) {
+            try {
+                fut.get();
+
+                succeeded++;
+            }
+            catch (IgniteCheckedException e) {
+                info(e.getMessage());
+
+                failed++;
+            }
+        }
+
+        assertEquals(1, succeeded);
+        assertEquals(threadNum - 1, failed);
+    }
+}

Reply via email to