IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6a5e48a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6a5e48a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6a5e48a Branch: refs/heads/ignite-421 Commit: a6a5e48ad194229ba20e6c806e62f867a5ba9234 Parents: 1cd95ae Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Mar 9 20:25:21 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Mar 9 20:25:21 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/ignite/Ignite.java | 21 +- .../apache/ignite/IgniteSystemProperties.java | 5 + .../main/java/org/apache/ignite/Ignition.java | 18 + .../configuration/CacheConfiguration.java | 38 ++ .../configuration/ClientCacheConfiguration.java | 174 -------- .../configuration/IgniteConfiguration.java | 38 +- .../configuration/NearCacheConfiguration.java | 174 ++++++++ .../internal/GridEventConsumeHandler.java | 10 +- .../apache/ignite/internal/IgniteKernal.java | 29 +- .../ignite/internal/IgniteNodeAttributes.java | 2 +- .../org/apache/ignite/internal/IgnitionEx.java | 48 ++- .../discovery/GridDiscoveryManager.java | 169 ++++++-- .../affinity/GridAffinityAssignmentCache.java | 8 +- .../affinity/GridAffinityProcessor.java | 15 +- .../cache/DynamicCacheChangeRequest.java | 57 ++- .../cache/DynamicCacheDescriptor.java | 66 ++- .../processors/cache/GridCacheAttributes.java | 7 + .../GridCacheDefaultAffinityKeyMapper.java | 2 +- .../cache/GridCacheDeploymentManager.java | 17 +- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 420 +++++++++---------- .../processors/cache/GridCacheUtils.java | 60 +-- .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 3 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 9 +- .../cache/query/GridCacheQueryAdapter.java | 4 +- .../cache/query/GridCacheQueryManager.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 20 +- .../jdbc/GridCacheQueryJdbcMetadataTask.java | 11 +- .../query/jdbc/GridCacheQueryJdbcTask.java | 11 +- .../jdbc/GridCacheQueryJdbcValidationTask.java | 7 +- .../ignite/internal/util/IgniteUtils.java | 123 ------ .../ignite/internal/util/lang/GridFunc.java | 30 -- ...idCacheConfigurationConsistencySelfTest.java | 8 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 61 +-- .../ignite/testframework/junits/IgniteMock.java | 21 + 37 files changed, 915 insertions(+), 779 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 31b827e..09991d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -157,7 +157,7 @@ public interface Ignite extends AutoCloseable { /** * Creates new {@link ExecutorService} which will execute all submitted - * {@link java.util.concurrent.Callable} and {@link Runnable} jobs on nodes in this grid projection. + * {@link Callable} and {@link Runnable} jobs on nodes in this grid projection. * This essentially * creates a <b><i>Distributed Thread Pool</i></b> that can be used as a * replacement for local thread pools. @@ -187,6 +187,25 @@ public interface Ignite extends AutoCloseable { public IgniteScheduler scheduler(); /** + * Dynamically starts new cache with the given cache configuration. + * + * @param cacheCfg Cache configuration to use. + */ + public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg); + + public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg, + @Nullable NearCacheConfiguration<K, V> nearCfg); + + public <K, V> IgniteCache<K, V> createCache(@Nullable NearCacheConfiguration<K, V> nearCfg); + + /** + * Stops dynamically started cache. + * + * @param cacheName Cache name to stop. + */ + public void destroyCache(String cacheName); + + /** * Gets an instance of {@link IgniteCache} API. {@code IgniteCache} is a fully-compatible * implementation of {@code JCache (JSR 107)} specification. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 547cbc6..139692d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -344,6 +344,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_EXCEPTION_REGISTRY_MAX_SIZE = "IGNITE_EXCEPTION_REGISTRY_MAX_SIZE"; /** + * Property controlling default behavior of cache client flag. + */ + public static final String IGNITE_CACHE_CLIENT = "IGNITE_CACHE_CLIENT"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/Ignition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java index 9ff6fd5..5def17b 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignition.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java @@ -132,6 +132,24 @@ public class Ignition { } /** + * Sets client mode flag. + * + * @param clientMode Client mode flag. + */ + public static void setClientMode(boolean clientMode) { + IgnitionEx.setClientMode(clientMode); + } + + /** + * Gets client mode flag. + * + * @return Client mode flag. + */ + public static boolean isClientMode() { + return IgnitionEx.isClientMode(); + } + + /** * Gets state of grid default grid. * * @return Default grid state. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index a4bc6df..723d327 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -23,8 +23,12 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.spi.indexing.*; import org.jetbrains.annotations.*; @@ -163,6 +167,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default value for 'readFromBackup' flag. */ public static final boolean DFLT_READ_FROM_BACKUP = true; + /** Filter that accepts only server nodes. */ + public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + + return attr != null && !attr; + } + }; + + /** Filter that accepts all nodes. */ + public static final IgnitePredicate<ClusterNode> ALL_NODES = F.alwaysTrue(); + /** Cache name. */ private String name; @@ -322,6 +338,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Collection of type metadata. */ private Collection<CacheTypeMetadata> typeMeta; + /** Node filter specifying nodes on which this cache should be deployed. */ + private IgnitePredicate<ClusterNode> nodeFilter; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -379,6 +398,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { name = cc.getName(); nearStartSize = cc.getNearStartSize(); nearEvictPlc = cc.getNearEvictionPolicy(); + nodeFilter = cc.getNodeFilter(); preloadMode = cc.getPreloadMode(); preloadBatchSize = cc.getPreloadBatchSize(); preloadDelay = cc.getPreloadPartitionedDelay(); @@ -542,6 +562,24 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * Gets filter which determines on what nodes the cache should be started. + * + * @return Predicate specifying on which nodes the cache should be started. + */ + public IgnitePredicate<ClusterNode> getNodeFilter() { + return nodeFilter; + } + + /** + * Sets filter which determines on what nodes the cache should be started. + * + * @param nodeFilter Predicate specifying on which nodes the cache should be started. + */ + public void setNodeFilter(IgnitePredicate<ClusterNode> nodeFilter) { + this.nodeFilter = nodeFilter; + } + + /** * Gets flag indicating whether eviction is synchronized between primary and * backup nodes on partitioned cache. If this parameter is {@code true} and * swap is disabled then {@link CacheProjection#evict(Object)} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/configuration/ClientCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientCacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientCacheConfiguration.java deleted file mode 100644 index ecfeadc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientCacheConfiguration.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.configuration; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.*; - -import javax.cache.configuration.*; - -import static org.apache.ignite.configuration.CacheConfiguration.*; - -/** - * Client cache configuration. - */ -public class ClientCacheConfiguration<K, V> extends MutableConfiguration<K, V> { - /** Cache name. */ - private String name; - - /** Near enabled flag. */ - private boolean nearEnabled; - - /** Near cache eviction policy. */ - private CacheEvictionPolicy nearEvictPlc; - - /** Flag indicating whether eviction is synchronized with near nodes. */ - private boolean evictNearSync = DFLT_EVICT_NEAR_SYNCHRONIZED; - - /** Default near cache start size. */ - private int nearStartSize = DFLT_NEAR_START_SIZE; - - /** - * Empty constructor. - */ - public ClientCacheConfiguration() { - // No-op. - } - - /** - * @param cfg Configuration to copy. - */ - public ClientCacheConfiguration(CompleteConfiguration<K, V> cfg) { - super(cfg); - - // Preserve alphabetic order. - if (cfg instanceof CacheConfiguration) { - CacheConfiguration ccfg = (CacheConfiguration)cfg; - - evictNearSync = ccfg.isEvictNearSynchronized(); - name = ccfg.getName(); - nearEnabled = ccfg.isNearEnabled(); - nearEvictPlc = ccfg.getNearEvictionPolicy(); - nearStartSize = ccfg.getNearStartSize(); - } - else if (cfg instanceof ClientCacheConfiguration) { - ClientCacheConfiguration ccfg = (ClientCacheConfiguration)cfg; - - evictNearSync = ccfg.isEvictNearSynchronized(); - name = ccfg.getName(); - nearEnabled = ccfg.isNearEnabled(); - nearEvictPlc = ccfg.getNearEvictionPolicy(); - nearStartSize = ccfg.getNearStartSize(); - } - } - - /** - * Gets cache name. The cache can be accessed via {@link Ignite#jcache(String)} method. - * - * @return Cache name. - */ - public String getName() { - return name; - } - - /** - * Sets cache name. - * - * @param name Cache name. - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets near enabled flag. - * - * @return Near enabled flag. - */ - public boolean isNearEnabled() { - return nearEnabled; - } - - /** - * Sets near enabled flag. - * - * @param nearEnabled Near enabled flag. - */ - public void setNearEnabled(boolean nearEnabled) { - this.nearEnabled = nearEnabled; - } - - /** - * @return Near eviction policy. - */ - public CacheEvictionPolicy getNearEvictionPolicy() { - return nearEvictPlc; - } - - /** - * @param nearEvictPlc Near eviction policy. - */ - public void setNearEvictionPolicy(CacheEvictionPolicy nearEvictPlc) { - this.nearEvictPlc = nearEvictPlc; - } - - /** - * Gets flag indicating whether eviction on primary node is synchronized with - * near nodes where entry is kept. Default value is {@code true}. - * <p> - * Note that in most cases this property should be set to {@code true} to keep - * cache consistency. But there may be the cases when user may use some - * special near eviction policy to have desired control over near cache - * entry set. - * - * @return {@code true} If eviction is synchronized with near nodes in - * partitioned cache, {@code false} if not. - */ - public boolean isEvictNearSynchronized() { - return evictNearSync; - } - - /** - * Sets flag indicating whether eviction is synchronized with near nodes. - * - * @param evictNearSync {@code true} if synchronized, {@code false} if not. - */ - public void setEvictNearSynchronized(boolean evictNearSync) { - this.evictNearSync = evictNearSync; - } - - /** - * Gets initial cache size for near cache which will be used to pre-create internal - * hash table after start. Default value is defined by {@link CacheConfiguration#DFLT_NEAR_START_SIZE}. - * - * @return Initial near cache size. - */ - public int getNearStartSize() { - return nearStartSize; - } - - /** - * Start size for near cache. This property is only used for {@link CacheMode#PARTITIONED} caching mode. - * - * @param nearStartSize Start size for near cache. - */ - public void setNearStartSize(int nearStartSize) { - this.nearStartSize = nearStartSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index c84daab..8a4536d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -321,7 +321,10 @@ public class IgniteConfiguration { private CacheConfiguration[] cacheCfg; /** Client cache configurations. */ - private ClientCacheConfiguration[] clientCacheCfg; + private NearCacheConfiguration[] nearCacheCfg; + + /** Client mode flag. */ + private Boolean clientMode; /** Transactions configuration. */ private TransactionConfiguration txCfg = new TransactionConfiguration(); @@ -1569,17 +1572,40 @@ public class IgniteConfiguration { } /** - * Gets configuration (descriptors) for all client caches. + * Gets configuration (descriptors) for all near caches. * * @return Client cache configurations. */ - public ClientCacheConfiguration[] getClientCacheConfiguration() { - return clientCacheCfg; + public NearCacheConfiguration[] getNearCacheConfiguration() { + return nearCacheCfg; } + /** + * Sets configuration for all near caches. + * + * @param nearCacheCfg Near cache configurations. + */ @SuppressWarnings({"ZeroLengthArrayAllocation"}) - public void setClientCacheConfiguration(ClientCacheConfiguration... clientCacheCfg) { - this.clientCacheCfg = clientCacheCfg == null ? new ClientCacheConfiguration[0] : clientCacheCfg; + public void setNearCacheConfiguration(NearCacheConfiguration... nearCacheCfg) { + this.nearCacheCfg = nearCacheCfg == null ? new NearCacheConfiguration[0] : nearCacheCfg; + } + + /** + * Gets client mode flag. + * + * @return Client mode flag. + */ + public Boolean isClientMode() { + return clientMode; + } + + /** + * Sets client mode flag. + * + * @param clientMode Client mode flag. + */ + public void setClientMode(boolean clientMode) { + this.clientMode = clientMode; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java new file mode 100644 index 0000000..e729b9d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/NearCacheConfiguration.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; + +import javax.cache.configuration.*; + +import static org.apache.ignite.configuration.CacheConfiguration.*; + +/** + * Client cache configuration. + */ +public class NearCacheConfiguration<K, V> extends MutableConfiguration<K, V> { + /** Cache name. */ + private String name; + + /** Near enabled flag. */ + private boolean nearEnabled; + + /** Near cache eviction policy. */ + private CacheEvictionPolicy nearEvictPlc; + + /** Flag indicating whether eviction is synchronized with near nodes. */ + private boolean evictNearSync = DFLT_EVICT_NEAR_SYNCHRONIZED; + + /** Default near cache start size. */ + private int nearStartSize = DFLT_NEAR_START_SIZE; + + /** + * Empty constructor. + */ + public NearCacheConfiguration() { + // No-op. + } + + /** + * @param cfg Configuration to copy. + */ + public NearCacheConfiguration(CompleteConfiguration<K, V> cfg) { + super(cfg); + + // Preserve alphabetic order. + if (cfg instanceof CacheConfiguration) { + CacheConfiguration ccfg = (CacheConfiguration)cfg; + + evictNearSync = ccfg.isEvictNearSynchronized(); + name = ccfg.getName(); + nearEnabled = ccfg.isNearEnabled(); + nearEvictPlc = ccfg.getNearEvictionPolicy(); + nearStartSize = ccfg.getNearStartSize(); + } + else if (cfg instanceof NearCacheConfiguration) { + NearCacheConfiguration ccfg = (NearCacheConfiguration)cfg; + + evictNearSync = ccfg.isEvictNearSynchronized(); + name = ccfg.getName(); + nearEnabled = ccfg.isNearEnabled(); + nearEvictPlc = ccfg.getNearEvictionPolicy(); + nearStartSize = ccfg.getNearStartSize(); + } + } + + /** + * Gets cache name. The cache can be accessed via {@link Ignite#jcache(String)} method. + * + * @return Cache name. + */ + public String getName() { + return name; + } + + /** + * Sets cache name. + * + * @param name Cache name. + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets near enabled flag. + * + * @return Near enabled flag. + */ + public boolean isNearEnabled() { + return nearEnabled; + } + + /** + * Sets near enabled flag. + * + * @param nearEnabled Near enabled flag. + */ + public void setNearEnabled(boolean nearEnabled) { + this.nearEnabled = nearEnabled; + } + + /** + * @return Near eviction policy. + */ + public CacheEvictionPolicy getNearEvictionPolicy() { + return nearEvictPlc; + } + + /** + * @param nearEvictPlc Near eviction policy. + */ + public void setNearEvictionPolicy(CacheEvictionPolicy nearEvictPlc) { + this.nearEvictPlc = nearEvictPlc; + } + + /** + * Gets flag indicating whether eviction on primary node is synchronized with + * near nodes where entry is kept. Default value is {@code true}. + * <p> + * Note that in most cases this property should be set to {@code true} to keep + * cache consistency. But there may be the cases when user may use some + * special near eviction policy to have desired control over near cache + * entry set. + * + * @return {@code true} If eviction is synchronized with near nodes in + * partitioned cache, {@code false} if not. + */ + public boolean isEvictNearSynchronized() { + return evictNearSync; + } + + /** + * Sets flag indicating whether eviction is synchronized with near nodes. + * + * @param evictNearSync {@code true} if synchronized, {@code false} if not. + */ + public void setEvictNearSynchronized(boolean evictNearSync) { + this.evictNearSync = evictNearSync; + } + + /** + * Gets initial cache size for near cache which will be used to pre-create internal + * hash table after start. Default value is defined by {@link CacheConfiguration#DFLT_NEAR_START_SIZE}. + * + * @return Initial near cache size. + */ + public int getNearStartSize() { + return nearStartSize; + } + + /** + * Start size for near cache. This property is only used for {@link CacheMode#PARTITIONED} caching mode. + * + * @param nearStartSize Start size for near cache. + */ + public void setNearStartSize(int nearStartSize) { + this.nearStartSize = nearStartSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 68d8c0b..81a65e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -21,10 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -44,7 +44,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { private static final long serialVersionUID = 0L; /** Default callback. */ - private static final P2<UUID, Event> DFLT_CALLBACK = new P2<UUID, Event>() { + private static final IgniteBiPredicate<UUID,Event> DFLT_CALLBACK = new P2<UUID, Event>() { @Override public boolean apply(UUID uuid, Event e) { return true; } @@ -129,7 +129,9 @@ class GridEventConsumeHandler implements GridContinuousHandler { ctx.continuous().stopRoutine(routineId); } else { - ClusterNode node = ctx.discovery().node(nodeId); + GridDiscoveryManager disco = ctx.discovery(); + + ClusterNode node = disco.node(nodeId); if (node != null) { try { @@ -138,7 +140,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { if (evt instanceof CacheEvent) { String cacheName = ((CacheEvent)evt).cacheName(); - if (ctx.config().isPeerClassLoadingEnabled() && U.hasCache(node, cacheName)) { + if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) { wrapper.p2pMarshal(ctx.config().getMarshaller()); wrapper.cacheName = cacheName; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index bd3b397..8cd64a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1185,6 +1185,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_JVM_PID, U.jvmPid()); + add(ATTR_CLIENT_MODE, cfg.isClientMode()); + // Build a string from JVM arguments, because parameters with spaces are split. SB jvmArgs = new SB(512); @@ -1919,7 +1921,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (log.isDebugEnabled()) for (Object key : U.asIterable(System.getProperties().keys())) - log.debug("System property [" + key + '=' + System.getProperty((String) key) + ']'); + log.debug("System property [" + key + '=' + System.getProperty((String)key) + ']'); } /** @@ -2250,6 +2252,31 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) { + // TODO: implement. + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg, + @Nullable NearCacheConfiguration<K, V> nearCfg) { + // TODO: implement. + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(@Nullable NearCacheConfiguration<K, V> nearCfg) { + // TODO: implement. + return null; + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) { + // TODO: implement. + + } + /** * @return Public caches. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index ca6f79b..d1a829e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -130,7 +130,7 @@ public final class IgniteNodeAttributes { public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject"; /** Cache interceptors. */ - public static final String ATTR_CACHE_INTERCEPTORS = ATTR_PREFIX + ".cache.interceptors"; + public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client"; /** * Enforces singleton. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 979be6c..6562bbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -117,7 +117,18 @@ public class IgnitionEx { private static final Collection<IgnitionListener> lsnrs = new GridConcurrentHashSet<>(4); /** */ - private static volatile boolean daemon; + private static ThreadLocal<Boolean> daemon = new ThreadLocal<Boolean>() { + @Override protected Boolean initialValue() { + return false; + } + }; + + /** */ + private static ThreadLocal<Boolean> clientMode = new ThreadLocal<Boolean>() { + @Override protected Boolean initialValue() { + return null; + } + }; /** * Checks runtime version to be 1.7.x or 1.8.x. @@ -156,7 +167,7 @@ public class IgnitionEx { * @param daemon Daemon flag to set. */ public static void setDaemon(boolean daemon) { - IgnitionEx.daemon = daemon; + IgnitionEx.daemon.set(daemon); } /** @@ -170,7 +181,25 @@ public class IgnitionEx { * @return Daemon flag. */ public static boolean isDaemon() { - return daemon; + return daemon.get(); + } + + /** + * Sets client mode flag. + * + * @param clientMode Client mode flag. + */ + public static void setClientMode(boolean clientMode) { + IgnitionEx.clientMode.set(clientMode); + } + + /** + * Gets client mode flag. + * + * @return Client mode flag. + */ + public static boolean isClientMode() { + return clientMode.get(); } /** @@ -1539,9 +1568,18 @@ public class IgnitionEx { myCfg.setLocalHost(F.isEmpty(locHost) ? myCfg.getLocalHost() : locHost); // Override daemon flag if it was set on the factory. - if (daemon) + if (daemon.get()) myCfg.setDaemon(true); + if (myCfg.isClientMode() == null) { + Boolean threadClient = clientMode.get(); + + if (threadClient == null) + myCfg.setClientMode(IgniteSystemProperties.getBoolean(IGNITE_CACHE_CLIENT, false)); + else + myCfg.setClientMode(threadClient); + } + // Check for deployment mode override. String depModeName = IgniteSystemProperties.getString(IGNITE_DEP_MODE_OVERRIDE); @@ -1872,6 +1910,7 @@ public class IgnitionEx { cache.setPreloadMode(SYNC); cache.setWriteSynchronizationMode(FULL_SYNC); cache.setAffinity(new CacheRendezvousAffinityFunction(false, 100)); + cache.setNodeFilter(CacheConfiguration.ALL_NODES); if (client) cache.setDistributionMode(CLIENT_ONLY); @@ -1896,6 +1935,7 @@ public class IgnitionEx { ccfg.setPreloadMode(SYNC); ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); + ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); if (cfg.getCacheMode() == PARTITIONED) { ccfg.setBackups(cfg.getBackups()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 727ac13..8de3d1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -21,11 +21,11 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.util.GridConcurrentHashSet; import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; @@ -168,7 +168,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private GridPlainInClosure<Serializable> customEvtLsnr; /** Map of dynamic cache filters. */ - private Map<String, IgnitePredicate<ClusterNode>> dynamicCacheFilters = new HashMap<>(); + private Map<String, CachePredicate> dynamicCacheFilters = new HashMap<>(); /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { @@ -208,9 +208,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param filter Cache filter. + * @param loc {@code True} if cache is local. */ - public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) { - dynamicCacheFilters.put(cacheName, filter); + public void addDynamicCacheFilter( + String cacheName, + IgnitePredicate<ClusterNode> filter, + boolean nearEnabled, + boolean loc + ) { + dynamicCacheFilters.put(cacheName, new CachePredicate(filter, nearEnabled, loc)); } /** @@ -1118,6 +1124,41 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Checks if node is a data node for the given cache. + * + * @param node Node to check. + * @param cacheName Cache name. + * @return {@code True} if node is a cache data node. + */ + public boolean cacheAffinityNode(ClusterNode node, String cacheName) { + CachePredicate predicate = dynamicCacheFilters.get(cacheName); + + return predicate != null && predicate.dataNode(node); + } + + /** + * @param node Node to check. + * @param cacheName Cache name. + * @return {@code True} if node has near cache enabled. + */ + public boolean cacheNearNode(ClusterNode node, String cacheName) { + CachePredicate predicate = dynamicCacheFilters.get(cacheName); + + return predicate != null && predicate.nearNode(node); + } + + /** + * @param node Node to check. + * @param cacheName Cache name. + * @return If cache with the given name is accessible on the given node. + */ + public boolean cacheNode(ClusterNode node, String cacheName) { + CachePredicate predicate = dynamicCacheFilters.get(cacheName); + + return predicate != null && predicate.cacheNode(node); + } + + /** * Checks if cache with given name has at least one node with near cache enabled. * * @param cacheName Cache name. @@ -1197,7 +1238,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets first grid node start time, see {@link org.apache.ignite.spi.discovery.DiscoverySpi#getGridStartTime()}. + * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}. * * @return Start time of the first grid node. */ @@ -1330,7 +1371,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Method is called when any discovery event occurs. * - * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details. + * @param type Discovery event type. See {@link DiscoveryEvent} for more details. * @param topVer Topology version. * @param node Remote node this event is connected with. * @param topSnapshot Topology snapshot. @@ -1853,52 +1894,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (node.order() > maxOrder0) maxOrder0 = node.order(); - GridCacheAttributes[] caches = node.attribute(ATTR_CACHE); - boolean hasCaches = false; - if (caches != null) { - nodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - rmtNodesWithCaches.add(node); - - for (GridCacheAttributes attrs : caches) { - addToMap(cacheMap, attrs.cacheName(), node); - - if (alive(node.id())) - addToMap(aliveCacheNodes, maskNull(attrs.cacheName()), node); - - if (attrs.isAffinityNode()) - addToMap(dhtNodesMap, attrs.cacheName(), node); - - if (attrs.nearCacheEnabled()) - nearEnabledSet.add(attrs.cacheName()); - - if (!loc.id().equals(node.id())) { - addToMap(rmtCacheMap, attrs.cacheName(), node); + for (Map.Entry<String, CachePredicate> entry : dynamicCacheFilters.entrySet()) { + String cacheName = entry.getKey(); - if (alive(node.id())) - addToMap(aliveRmtCacheNodes, maskNull(attrs.cacheName()), node); - } - } + CachePredicate filter = entry.getValue(); - hasCaches = true; - } + if (filter.cacheNode(node)) { + nodesWithCaches.add(node); - for (Map.Entry<String, IgnitePredicate<ClusterNode>> entry : dynamicCacheFilters.entrySet()) { - String cacheName = entry.getKey(); - IgnitePredicate<ClusterNode> filter = entry.getValue(); + if (!loc.id().equals(node.id())) + rmtNodesWithCaches.add(node); - if (filter.apply(node)) { addToMap(cacheMap, cacheName, node); if (alive(node.id())) addToMap(aliveCacheNodes, maskNull(cacheName), node); - addToMap(dhtNodesMap, cacheName, node); + if (filter.dataNode(node)) + addToMap(dhtNodesMap, cacheName, node); - // TODO IGNITE-45 client and near caches. + if (filter.nearNode(node)) + nearEnabledSet.add(cacheName); if (!loc.id().equals(node.id())) { addToMap(rmtCacheMap, cacheName, node); @@ -2159,7 +2177,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param exclNode Node to exclude. */ private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { - for (String cacheName : U.cacheNames(exclNode)) { + for (String cacheName : dynamicCacheFilters.keySet()) { String maskedName = maskNull(cacheName); while (true) { @@ -2226,4 +2244,73 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); } } + + /** + * Cache predicate. + */ + private static class CachePredicate { + /** Cache filter. */ + private IgnitePredicate<ClusterNode> cacheFilter; + + /** If near cache is enabled on data nodes. */ + private boolean nearEnabled; + + /** Flag indicating if cache is local. */ + private boolean loc; + + /** Collection of client near nodes. */ + private Collection<UUID> nearNodes; + + /** + * @param cacheFilter Cache filter. + * @param nearEnabled Near enabled flag. + */ + private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) { + assert cacheFilter != null; + + this.cacheFilter = cacheFilter; + this.nearEnabled = nearEnabled; + this.loc = loc; + + nearNodes = new GridConcurrentHashSet<>(); + } + + /** + * @param nodeId Near node ID to add. + */ + public void addNearNode(UUID nodeId) { + nearNodes.add(nodeId); + } + + /** + * @param nodeId Near node ID to remove. + */ + public void removeNearNode(UUID nodeId) { + nearNodes.remove(nodeId); + } + + /** + * @param node Node to check. + * @return {@code True} if this node is a data node for given cache. + */ + public boolean dataNode(ClusterNode node) { + return cacheFilter.apply(node); + } + + /** + * @param node Node to check. + * @return {@code True} if cache is accessible on the given node. + */ + public boolean cacheNode(ClusterNode node) { + return !loc || cacheFilter.apply(node); + } + + /** + * @param node Node to check. + * @return {@code True} if near cache is present on the given nodes. + */ + public boolean nearNode(ClusterNode node) { + return (nearEnabled && cacheFilter.apply(node)) || nearNodes.contains(node.id()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index b964f83..c8a9e1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.affinity; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; @@ -35,8 +34,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; - /** * Affinity cached function. */ @@ -170,10 +167,9 @@ public class GridAffinityAssignmentCache { List<List<ClusterNode>> assignment; if (prevAssignment != null && discoEvt != null) { - CacheDistributionMode distroMode = U.distributionMode(discoEvt.eventNode(), ctx.name()); + boolean affNode = ctx.discovery().cacheAffinityNode(discoEvt.eventNode(), ctx.name()); - if (distroMode == null || // no cache on node. - distroMode == CLIENT_ONLY || distroMode == NEAR_ONLY) + if (!affNode) assignment = prevAssignment; else assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 58aad82..1924f2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -77,10 +77,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { // Clean up affinity functions if such cache no more exists. if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { - final Collection<String> caches = new HashSet<>(); - - for (ClusterNode clusterNode : ((DiscoveryEvent)evt).topologyNodes()) - caches.addAll(U.cacheNames(clusterNode)); + final Collection<String> caches = ctx.cache().cacheNames(); final Collection<AffinityAssignmentKey> rmv = new HashSet<>(); @@ -186,7 +183,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ClusterNode loc = ctx.discovery().localNode(); - if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) + if (ctx.discovery().cacheNode(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) return Collections.singletonList(loc); AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); @@ -272,7 +269,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ClusterNode loc = ctx.discovery().localNode(); - if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) + if (ctx.discovery().cacheNode(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) return F.asMap(loc, (Collection<K>)keys); AffinityInfo affInfo = affinityCache(cacheName, topVer); @@ -298,7 +295,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ClusterNode loc = ctx.discovery().localNode(); // Check local node. - if (U.hasCache(loc, cacheName)) { + if (ctx.discovery().cacheNode(loc, cacheName)) { GridCacheContext<Object,Object> cctx = ctx.cache().internalCache(cacheName).context(); AffinityInfo info = new AffinityInfo( @@ -319,7 +316,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.discovery().remoteNodes(), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return U.hasCache(n, cacheName); + return ctx.discovery().cacheNode(n, cacheName); } }); @@ -352,7 +349,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ClusterNode n = it.next(); - CacheMode mode = U.cacheMode(n, cacheName); + CacheMode mode = ctx.cache().cacheMode(cacheName); assert mode != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 2a61200..b6776a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import java.io.*; +import java.util.*; /** * Cache start/stop request. @@ -34,29 +34,28 @@ public class DynamicCacheChangeRequest implements Serializable { /** Stop cache name. */ @GridToStringExclude - private final String stopName; + private String stopName; /** Cache start configuration. */ - private final CacheConfiguration startCfg; + private CacheConfiguration startCfg; - /** Cache start node filter. */ - private final IgnitePredicate<ClusterNode> startNodeFltr; + /** Near node ID in case if near cache is being started. */ + private UUID nearNodeId; + + /** Near cache configuration. */ + private NearCacheConfiguration nearCacheCfg; /** * Constructor creates cache start request. * * @param startCfg Start cache configuration. - * @param startNodeFltr Start node filter. */ public DynamicCacheChangeRequest( - CacheConfiguration startCfg, - IgnitePredicate<ClusterNode> startNodeFltr + CacheConfiguration startCfg ) { this.startCfg = startCfg; - this.startNodeFltr = startNodeFltr; deploymentId = IgniteUuid.randomUuid(); - stopName = null; } /** @@ -66,9 +65,17 @@ public class DynamicCacheChangeRequest implements Serializable { */ public DynamicCacheChangeRequest(String stopName) { this.stopName = stopName; + } - startCfg = null; - startNodeFltr = null; + /** + * Constructor creates near cache start request. + * + * @param nearNodeId Near node ID. + * @param nearCacheCfg Near cache configuration. + */ + public DynamicCacheChangeRequest(UUID nearNodeId, NearCacheConfiguration nearCacheCfg) { + this.nearNodeId = nearNodeId; + this.nearCacheCfg = nearCacheCfg; } /** @@ -93,6 +100,13 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @return If this is a near cache start request. + */ + public boolean isNearStart() { + return nearNodeId != null; + } + + /** * @return Cache name. */ public String cacheName() { @@ -100,17 +114,24 @@ public class DynamicCacheChangeRequest implements Serializable { } /** - * @return Cache configuration. + * @return Near node ID. */ - public CacheConfiguration startCacheConfiguration() { - return startCfg; + public UUID nearNodeId() { + return nearNodeId; } /** - * @return Node filter. + * @return Near cache configuration. */ - public IgnitePredicate<ClusterNode> startNodeFilter() { - return startNodeFltr; + public NearCacheConfiguration nearCacheCfg() { + return nearCacheCfg; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration startCacheConfiguration() { + return startCfg; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index e7f7e2d..4ae96a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.cluster.*; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -34,20 +34,20 @@ public class DynamicCacheDescriptor { @GridToStringExclude private CacheConfiguration cacheCfg; - /** Deploy filter bytes. */ - @GridToStringExclude - private IgnitePredicate<ClusterNode> nodeFilter; - /** Cancelled flag. */ private boolean cancelled; + /** Validation error. */ + private IgniteCheckedException validationError; + + /** Locally configured flag. */ + private boolean locCfg; + /** * @param cacheCfg Cache configuration. - * @param nodeFilter Node filter. */ - public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid deploymentId) { + public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgniteUuid deploymentId) { this.cacheCfg = cacheCfg; - this.nodeFilter = nodeFilter; this.deploymentId = deploymentId; } @@ -59,17 +59,31 @@ public class DynamicCacheDescriptor { } /** - * @return Cache configuration. + * @param deploymentId Deployment ID. */ - public CacheConfiguration cacheConfiguration() { - return cacheCfg; + public void deploymentId(IgniteUuid deploymentId) { + this.deploymentId = deploymentId; } /** - * @return Node filter. + * @return Locally configured flag. */ - public IgnitePredicate<ClusterNode> nodeFilter() { - return nodeFilter; + public boolean locallyConfigured() { + return locCfg; + } + + /** + * @param locCfg Locally configured flag. + */ + public void locallyConfigured(boolean locCfg) { + this.locCfg = locCfg; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration cacheConfiguration() { + return cacheCfg; } /** @@ -86,8 +100,32 @@ public class DynamicCacheDescriptor { return cancelled; } + /** + * @return {@code True} if descriptor is valid and cache should be started. + */ + public boolean valid() { + return validationError == null; + } + + /** + * @throws IgniteCheckedException If validation failed. + */ + public void checkValid() throws IgniteCheckedException { + if (validationError != null) + throw validationError; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", cacheCfg.getName()); } + + /** + * Sets validation error. + * + * @param e Validation error. + */ + public void validationFailed(IgniteCheckedException e) { + validationError = e; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index 8cd51e9..5f715f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -359,6 +359,13 @@ public class GridCacheAttributes implements Serializable { } /** + * @return Interceptor class name. + */ + public String interceptorClassName() { + return className(ccfg.getInterceptor()); + } + + /** * @param obj Object to get class of. * @return Class name or {@code null}. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java index 2b6bfc9..61f1f07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java @@ -46,7 +46,7 @@ public class GridCacheDefaultAffinityKeyMapper implements CacheAffinityKeyMapper /** Injected ignite instance. */ @IgniteInstanceResource - private Ignite ignite; + private transient Ignite ignite; /** Reflection cache. */ private GridReflectionCache reflectCache = new GridReflectionCache( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index c9467eb..eb60dba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -46,9 +46,6 @@ import static org.apache.ignite.events.EventType.*; * Deployment manager for cache. */ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { - /** Node filter. */ - private IgnitePredicate<ClusterNode> nodeFilter; - /** Cache class loader */ private volatile ClassLoader globalLdr; @@ -84,12 +81,6 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap @Override public void start0() throws IgniteCheckedException { globalLdr = new CacheClassLoader(cctx.gridConfig().getClassLoader()); - nodeFilter = new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return U.hasCaches(node); - } - }; - depEnabled = cctx.gridDeploy().enabled(); if (depEnabled) { @@ -141,7 +132,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap /** * Gets distributed class loader. Note that - * {@link #p2pContext(UUID, org.apache.ignite.lang.IgniteUuid, String, org.apache.ignite.configuration.DeploymentMode, Map, boolean)} must be + * {@link #p2pContext(UUID, IgniteUuid, String, DeploymentMode, Map, boolean)} must be * called from the same thread prior to using this class loader, or the * loading may happen for the wrong node or context. * @@ -180,8 +171,6 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap * @param ctx Cache context. */ public void unwind(GridCacheContext ctx) { - int cnt = 0; - List<CA> q; synchronized (undeploys) { @@ -191,6 +180,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap if (q == null) return; + int cnt = 0; + for (CA c : q) { c.apply(); @@ -809,7 +800,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap sndId, ldrId, participants, - nodeFilter); + F.<ClusterNode>alwaysTrue()); if (d != null) { Class cls = d.deployedClass(name); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 1ade451..93dde23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -178,7 +178,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V DiscoveryEvent discoEvt = (DiscoveryEvent)evt; // Notify backup worker on each topology change. - if (CU.affinityNode(cctx, discoEvt.eventNode())) + if (cctx.discovery().cacheAffinityNode(discoEvt.eventNode(), cctx.name())) backupWorker.addEvent(discoEvt); } },