# ignite-598: fix bugs and support remote validation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c280901d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c280901d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c280901d Branch: refs/heads/ignite-593 Commit: c280901d67d7273573f4bf326d8d7304523d28be Parents: b78dbff Author: Artem Shutak <ashu...@gridgain.com> Authored: Fri Apr 3 17:10:05 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Fri Apr 3 17:10:05 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 2 +- .../ignite/internal/GridCachePluginContext.java | 6 ++ .../processors/cache/GridCacheProcessor.java | 64 +++++++++++++------- .../processors/plugin/CachePluginManager.java | 51 ++++++++++++---- .../ignite/plugin/CachePluginContext.java | 11 ++++ .../ignite/plugin/CachePluginProvider.java | 4 +- 6 files changed, 101 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 22148ad..f7f8e26 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -407,7 +407,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** - * Cache name. If not provided or {@code null}, then this will be considered a default + * Cache name or {@code null} if not provided, then this will be considered a default * cache which can be accessed via {@link Ignite#cache(String)} method. Otherwise, if name * is provided, the cache will be accessed via {@link Ignite#cache(String)} method. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java index 2334bae..7c80db6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.plugin.*; @@ -64,6 +65,11 @@ public class GridCachePluginContext<C extends CachePluginConfiguration> implemen @Override public Ignite grid() { return ctx.grid(); } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return ctx.discovery().localNode(); + } /** {@inheritDoc} */ @Override public IgniteLogger log(Class<?> cls) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c366520..f21eabc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -627,6 +627,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); + // Init cache plugin managers. + final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration locCcfg = desc.cacheConfiguration(); + + CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); + + cache2PluginMgr.put(locCcfg.getName(), pluginMgr); + } + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { for (ClusterNode n : ctx.discovery().remoteNodes()) { checkTransactionConfiguration(n); @@ -640,8 +651,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheDescriptor desc : registeredCaches.values()) { CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) - checkCache(desc.cacheConfiguration(), rmtCfg, n); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); + + checkCache(locCfg, rmtCfg, n); + + // Check plugin cache configurations. + CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + + assert pluginMgr != null : " Map=" + cache2PluginMgr; + + pluginMgr.validateRemotes(rmtCfg, n); + } } } } @@ -661,7 +682,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (filter.apply(locNode)) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(), ccfg); - GridCacheContext ctx = createCache(ccfg, cacheObjCtx); + CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); + + assert pluginMgr != null : " Map=" + cache2PluginMgr; + + GridCacheContext ctx = createCache(ccfg, pluginMgr, cacheObjCtx); ctx.dynamicDeploymentId(desc.deploymentId()); @@ -975,17 +1000,24 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cfg Cache configuration to use to create cache. + * @param pluginMgr Cache plugin manager. * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ @SuppressWarnings({"unchecked"}) - private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { + private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, @Nullable CachePluginManager pluginMgr, + CacheObjectContext cacheObjCtx) throws IgniteCheckedException { assert cfg != null; CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; validate(ctx.config(), cfg, cfgStore); + if (pluginMgr == null) + pluginMgr = new CachePluginManager(ctx, cfg); + + pluginMgr.validate(); + CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null); jta.createTmLookup(cfg); @@ -1017,16 +1049,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); - CachePluginManager pluginMgr = new CachePluginManager(ctx, cfg); - pluginMgr.validate(); - - CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(ctx, cfg, - CacheConflictResolutionManager.class); - - GridCacheDrManager drMgr = pluginMgr.createComponent(ctx, cfg,GridCacheDrManager.class); - - CacheStoreManager storeMgr = pluginMgr.createComponent(ctx, cfg, CacheStoreManager.class); + CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class); + GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); + CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class); storeMgr.initialize(cfgStore, sesHolders); @@ -1156,7 +1182,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { evictMgr = new GridCacheEvictionManager(); evtMgr = new GridCacheEventManager(); pluginMgr = new CachePluginManager(ctx, cfg); - drMgr = pluginMgr.createComponent(ctx, cfg, GridCacheDrManager.class); + drMgr = pluginMgr.createComponent(GridCacheDrManager.class); cacheCtx = new GridCacheContext( ctx, @@ -1358,7 +1384,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(), ccfg); - GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx); + GridCacheContext cacheCtx = createCache(ccfg, null, cacheObjCtx); cacheCtx.startTopologyVersion(topVer); @@ -2028,14 +2054,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { true); } } - - // TODO 10006: implement remote configs validation. - // Check plugin configurations. -// for (CachePluginConfiguration locPluginCcfg : locCfg.getPluginConfigurations()) { -// CachePluginProvider provider = ...; -// -// provider.validateRemote(locCfg, locPluginCcfg, rmtCfg, rmtNode); -// } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java index 580ff49..640271a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.plugin; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -31,60 +32,71 @@ import java.util.*; * Cache plugin manager. */ public class CachePluginManager extends GridCacheManagerAdapter { + /** Providers list. To have providers order. */ + private final List<CachePluginProvider> providersList = new ArrayList<>(); + + /** */ + private final Map<CachePluginContext, CachePluginProvider> providersMap = new HashMap<>(); + /** */ - private final List<CachePluginProvider> providers = new ArrayList<>(); + private final GridKernalContext ctx; + + /** */ + private final CacheConfiguration cfg; /** * @param ctx Context. * @param cfg Cache config. */ public CachePluginManager(GridKernalContext ctx, CacheConfiguration cfg) { + this.ctx = ctx; + this.cfg = cfg; + if (cfg.getPluginConfigurations() != null) { for (CachePluginConfiguration cachePluginCfg : cfg.getPluginConfigurations()) { CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg, cachePluginCfg); CachePluginProvider provider = cachePluginCfg.createProvider(pluginCtx); - providers.add(provider); + providersList.add(provider); + providersMap.put(pluginCtx, provider); } } } /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { - for (CachePluginProvider provider : providers) + for (CachePluginProvider provider : providersList) provider.onIgniteStart(); } /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { - for (ListIterator<CachePluginProvider> iter = providers.listIterator(); iter.hasPrevious();) + for (ListIterator<CachePluginProvider> iter = providersList.listIterator(); iter.hasPrevious();) iter.previous().onIgniteStop(cancel); } /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - for (CachePluginProvider provider : providers) + for (CachePluginProvider provider : providersList) provider.start(); } /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { - for (ListIterator<CachePluginProvider> iter = providers.listIterator(); iter.hasPrevious();) + for (ListIterator<CachePluginProvider> iter = providersList.listIterator(); iter.hasPrevious();) iter.previous().stop(cancel); } /** * Creates optional component. * - * @param ctx Kernal context. - * @param cfg Cache configuration. * @param cls Component class. * @return Created component. */ @SuppressWarnings("unchecked") - public <T> T createComponent(GridKernalContext ctx, CacheConfiguration cfg, Class<T> cls) { - for (CachePluginProvider provider : providers) { + public <T> T createComponent(Class<T> cls) { + for (CachePluginProvider provider : providersList) { T res = (T)provider.createComponent(cls); if (res != null) @@ -107,7 +119,24 @@ public class CachePluginManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException If validation failed. */ public void validate() throws IgniteCheckedException { - for (CachePluginProvider provider : providers) + for (CachePluginProvider provider : providersList) provider.validate(); } + + /** + * Checks that remote caches has configuration compatible with the local. + * + * @param rmtCfg Remote cache configuration. + * @param rmtNode Remote rmtNode. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public void validateRemotes(CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException { + for (Map.Entry<CachePluginContext, CachePluginProvider> entry : providersMap.entrySet()) { + CachePluginContext cctx = entry.getKey(); + CachePluginProvider provider = entry.getValue(); + + provider.validateRemote(cctx.igniteCacheConfiguration(), cctx.cacheConfiguration(), rmtCfg, rmtNode); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java index 580b40d..f034312 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java @@ -18,7 +18,9 @@ package org.apache.ignite.plugin; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.DiscoverySpi; /** * Cache plugin context. @@ -45,6 +47,15 @@ public interface CachePluginContext<C extends CachePluginConfiguration> { public Ignite grid(); /** + * Gets local grid node. Instance of local node is provided by underlying {@link DiscoverySpi} + * implementation used. + * + * @return Local grid node. + * @see DiscoverySpi + */ + public ClusterNode localNode(); + + /** * Gets logger for given class. * * @param cls Class to get logger for. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java index 4006a70..ea5f795 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java @@ -77,8 +77,8 @@ public interface CachePluginProvider<C extends CachePluginConfiguration> { * @param locCfg Local configuration. * @param locPluginCcfg Local plugin configuration. * @param rmtCfg Remote configuration. - * @param node Node. + * @param rmtNode Remote node. */ - public void validateRemote(CacheConfiguration locCfg, C locPluginCcfg, CacheConfiguration rmtCfg, ClusterNode node) + public void validateRemote(CacheConfiguration locCfg, C locPluginCcfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException; }