http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7870d07..f867bb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -74,6 +74,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.*; /** * Cache processor. */ +@SuppressWarnings("unchecked") public class GridCacheProcessor extends GridProcessorAdapter { /** Shared cache context. */ private GridCacheSharedContext<?, ?> sharedCtx; @@ -109,11 +110,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>(); /** Dynamic caches. */ - private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>(); + private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); /** */ private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); + /** */ + private volatile boolean validateCfg = true; + /** * @param ctx Kernal context. */ @@ -142,6 +146,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getMemoryMode() == null) cfg.setMemoryMode(DFLT_MEMORY_MODE); + if (cfg.getNodeFilter() == null) + cfg.setNodeFilter(CacheConfiguration.SERVER_NODES); + if (cfg.getAffinity() == null) { if (cfg.getCacheMode() == PARTITIONED) { CacheRendezvousAffinityFunction aff = new CacheRendezvousAffinityFunction(); @@ -239,7 +246,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { writer = writerFactory.create(); if (ldr != null || writer != null) { - final GridCacheLoaderWriterStore store = new GridCacheLoaderWriterStore(ldr, writer); + final CacheStore store = new GridCacheLoaderWriterStore(ldr, writer); cfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() { @Override public CacheStore<? super Object, ? super Object> create() { @@ -616,6 +623,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheContext cacheCtx = createCache(cfg); + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, IgniteUuid.randomUuid()); + + desc.locallyConfigured(true); + + registeredCaches.put(cfg.getName(), desc); + + ctx.discovery().addDynamicCacheFilter(cfg.getName(), cfg.getNodeFilter(), cfg.isNearEnabled(), + cfg.getCacheMode() == LOCAL); + sharedCtx.addCacheContext(cacheCtx); startSeq.add(cacheCtx.cache()); @@ -653,32 +669,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { transactions = new IgniteTransactionsImpl(sharedCtx); - if (!(ctx.isDaemon() || F.isEmpty(ctx.config().getCacheConfiguration()))) { - GridCacheAttributes[] attrVals = new GridCacheAttributes[ctx.config().getCacheConfiguration().length]; - - Map<String, String> interceptors = new HashMap<>(); - - int i = 0; - - for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - assert caches.containsKey(cfg.getName()) : cfg.getName(); - - GridCacheContext ctx = caches.get(cfg.getName()).context(); - - attrVals[i++] = new GridCacheAttributes(cfg, ctx.store().configuredStore()); - - if (cfg.getInterceptor() != null) - interceptors.put(cfg.getName(), cfg.getInterceptor().getClass().getName()); - } - - ctx.addNodeAttribute(ATTR_CACHE, attrVals); - - ctx.addNodeAttribute(ATTR_TX_CONFIG, ctx.config().getTransactionConfiguration()); - - if (!interceptors.isEmpty()) - ctx.addNodeAttribute(ATTR_CACHE_INTERCEPTORS, interceptors); - } - if (log.isDebugEnabled()) log.debug("Started cache processor."); } @@ -689,13 +679,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); + + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); + } + } + + validateCfg = false; + // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : dynamicCaches.values()) { - if (hasStaticCache(desc.cacheConfiguration().getName())) - throw new IgniteCheckedException("Failed to start node (current grid has dynamic cache which " + - "conflicts with locally configured static cache: " + desc.cacheConfiguration().getName()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + // Check if validation failed on node start. + desc.checkValid(); + + IgnitePredicate filter = desc.cacheConfiguration().getNodeFilter(); - if (desc.nodeFilter().apply(ctx.discovery().localNode())) { + if (filter.apply(ctx.discovery().localNode()) && !desc.locallyConfigured()) { GridCacheContext ctx = createCache(desc.cacheConfiguration()); ctx.dynamicDeploymentId(desc.deploymentId()); @@ -716,10 +721,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkCache(n); - } + // Must call onKernalStart on shared managers after creation of fetched caches. + for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) + mgr.onKernalStart(); for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { GridCacheAdapter cache = e.getValue(); @@ -747,9 +751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (IgniteInternalFuture<?> fut : preloadFuts.values()) ((GridCompoundFuture<Object, Object>)fut).markInitialized(); - for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) - mgr.onKernalStart(); - for (GridCacheAdapter<?, ?> cache : caches.values()) onKernalStart(cache); @@ -1220,11 +1221,32 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Gets a collection of currentlty started caches. + * + * @return Collection of started cache names. + */ + public Collection<String> cacheNames() { + return registeredCaches.keySet(); + } + + /** + * Gets cache mode. + * + * @param cacheName Cache name to check. + * @return Cache mode. + */ + public CacheMode cacheMode(String cacheName) { + DynamicCacheDescriptor desc = registeredCaches.get(cacheName); + + return desc != null ? desc.cacheConfiguration().getCacheMode() : null; + } + + /** * @param req Request to check. * @return {@code True} if change request was registered to apply. */ public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { - DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); return desc != null && desc.deploymentId().equals(req.deploymentId()) && desc.cancelled() != req.isStart(); } @@ -1235,14 +1257,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { assert req.isStart(); - if (hasStaticCache(req.cacheName())) { - U.warn(log, "Failed to start dynamic cache (a static cache with the same name is already " + - "configured: " + req.cacheName()); - - return; - } + IgnitePredicate nodeFilter = req.startCacheConfiguration().getNodeFilter(); - if (req.startNodeFilter().apply(ctx.discovery().localNode())) { + if (nodeFilter.apply(ctx.discovery().localNode())) { GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); cacheCtx.dynamicDeploymentId(req.deploymentId()); @@ -1297,10 +1314,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); } else { - DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) - dynamicCaches.remove(req.cacheName(), desc); + registeredCaches.remove(req.cacheName(), desc); } DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.cacheName()); @@ -1346,13 +1363,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { // Collect dynamically started caches to a single object. - Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(dynamicCaches.size()); + Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(registeredCaches.size()); - for (DynamicCacheDescriptor desc : dynamicCaches.values()) { - if (!desc.cancelled()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - desc.cacheConfiguration(), - desc.nodeFilter()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (!desc.cancelled() && desc.valid()) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration()); req.deploymentId(desc.deploymentId()); @@ -1369,46 +1384,53 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; for (DynamicCacheChangeRequest req : batch.requests()) { - dynamicCaches.put(req.cacheName(), new DynamicCacheDescriptor( - req.startCacheConfiguration(), - req.startNodeFilter(), - req.deploymentId())); + DynamicCacheDescriptor existing = registeredCaches.get(req.cacheName()); - ctx.discovery().addDynamicCacheFilter(req.cacheName(), req.startNodeFilter()); - } - } - } + try { + if (req.isStart()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); - /** - * @param cacheName Cache name to check. - * @return {@code True} if local node has a static cache with the given name configured. - */ - private boolean hasStaticCache(String cacheName) { - for (CacheConfiguration ccfg : ctx.config().getCacheConfiguration()) { - if (F.eq(cacheName, ccfg.getName())) - return true; - } + if (validateCfg && existing != null) + checkCache(existing.cacheConfiguration(), ccfg, nodeId); + + if (existing == null) { + registeredCaches.put(req.cacheName(), new DynamicCacheDescriptor( + ccfg, + req.deploymentId())); + } + else + existing.deploymentId(req.deploymentId()); - return false; + if (existing == null || existing.valid()) { + ctx.discovery().addDynamicCacheFilter(req.cacheName(), + ccfg.getNodeFilter(), ccfg.isNearEnabled(), ccfg.getCacheMode() == LOCAL); + } + } + else if (req.isNearStart()) { + // TODO IGNITE-45. + //ctx.discovery().addDynamicCacheFilter(req.cacheName(), + } + } + catch (IgniteCheckedException e) { + existing.validationFailed(e); + } + } + } } /** * Dynamically starts cache. * * @param ccfg Cache configuration. - * @param nodeFilter Node filter to select nodes on which the cache should be deployed. * @return Future that will be completed when cache is deployed. */ - public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) { + public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg) { try { - if (nodeFilter == null) - nodeFilter = F.alwaysTrue(); - CacheConfiguration cfg = new CacheConfiguration(ccfg); initialize(cfg); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg, nodeFilter); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg); return F.first(initiateCacheChanges(F.asList(req))); } @@ -1522,7 +1544,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration ccfg = req.startCacheConfiguration(); // Check if cache with the same name was concurrently started form different node. - if (dynamicCaches.containsKey(ccfg.getName())) { + if (registeredCaches.containsKey(ccfg.getName())) { // If local node initiated start, fail the start future. DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(ccfg.getName()); @@ -1536,18 +1558,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } - DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.startNodeFilter(), - req.deploymentId()); + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.deploymentId()); - DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc); + DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc); - ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter()); + ctx.discovery().addDynamicCacheFilter(ccfg.getName(), ccfg.getNodeFilter(), ccfg.isNearEnabled(), + ccfg.getCacheMode() == LOCAL); assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; } else { - DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName()); + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); if (desc == null) { // If local node initiated start, fail the start future. @@ -1649,176 +1671,144 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Gets cache interceptor class name from node attributes. - * - * @param node Node. - * @param cacheName Cache name. - * @return Interceptor class name. - */ - @Nullable private String interceptor(ClusterNode node, @Nullable String cacheName) { - Map<String, String> map = node.attribute(ATTR_CACHE_INTERCEPTORS); - - return map != null ? map.get(cacheName) : null; - } - - /** * Checks that remote caches has configuration compatible with the local. * * @param rmt Node. * @throws IgniteCheckedException If check failed. */ - private void checkCache(ClusterNode rmt) throws IgniteCheckedException { - checkTransactionConfiguration(rmt); + private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, UUID rmt) throws IgniteCheckedException { + GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg, null); + GridCacheAttributes locAttr = new GridCacheAttributes(locCfg, null); - GridCacheAttributes[] rmtAttrs = U.cacheAttributes(rmt); - GridCacheAttributes[] locAttrs = U.cacheAttributes(ctx.discovery().localNode()); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", + locAttr.cacheMode(), rmtAttr.cacheMode(), true); - // If remote or local node does not have cache configured, do nothing - if (F.isEmpty(rmtAttrs) || F.isEmpty(locAttrs)) - return; - - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = rmt.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - - for (GridCacheAttributes rmtAttr : rmtAttrs) { - for (GridCacheAttributes locAttr : locAttrs) { - if (F.eq(rmtAttr.cacheName(), locAttr.cacheName())) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", - locAttr.cacheMode(), rmtAttr.cacheMode(), true); + if (rmtAttr.cacheMode() != LOCAL) { + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor", + locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true); - if (rmtAttr.cacheMode() != LOCAL) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor", - interceptor(ctx.discovery().localNode(), rmtAttr.cacheName()), - interceptor(rmt, rmtAttr.cacheName()), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode", + "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode", - "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", + "Cache preload mode", locAttr.cachePreloadMode(), rmtAttr.cachePreloadMode(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", - "Cache preload mode", locAttr.cachePreloadMode(), rmtAttr.cachePreloadMode(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity", + locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity", - locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper", + "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(), + rmtAttr.cacheAffinityMapperClassName(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper", - "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(), - rmtAttr.cacheAffinityMapperClassName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount", + "Affinity partitions count", locAttr.affinityPartitionsCount(), + rmtAttr.affinityPartitionsCount(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount", - "Affinity partitions count", locAttr.affinityPartitionsCount(), - rmtAttr.affinityPartitionsCount(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter", + locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter", - locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy", + locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy", - locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true); + if (!skipStoreConsistencyCheck(locAttr, rmtAttr)) { + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "store", "Cache store", + locAttr.storeClassName(), rmtAttr.storeClassName(), true); - if (!skipStoreConsistencyCheck(locAttr, rmtAttr)) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "store", "Cache store", - locAttr.storeClassName(), rmtAttr.storeClassName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "readThrough", + "Read through enabled", locAttr.readThrough(), locAttr.readThrough(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "readThrough", - "Read through enabled", locAttr.readThrough(), locAttr.readThrough(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeThrough", - "Write through enabled", locAttr.writeThrough(), locAttr.writeThrough(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "loadPreviousValue", - "Load previous value enabled", locAttr.loadPreviousValue(), - locAttr.loadPreviousValue(), true); - } + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeThrough", + "Write through enabled", locAttr.writeThrough(), locAttr.writeThrough(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup", - "Transaction manager lookup", locAttr.transactionManagerLookupClassName(), - rmtAttr.transactionManagerLookupClassName(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "loadPreviousValue", + "Load previous value enabled", locAttr.loadPreviousValue(), + locAttr.loadPreviousValue(), true); + } - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout", - "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup", + "Transaction manager lookup", locAttr.transactionManagerLookupClassName(), + rmtAttr.transactionManagerLookupClassName(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultQueryTimeout", - "Default query timeout", locAttr.defaultQueryTimeout(), rmtAttr.defaultQueryTimeout(), - false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout", + "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultTimeToLive", - "Default time to live", locAttr.defaultTimeToLive(), rmtAttr.defaultTimeToLive(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultQueryTimeout", + "Default query timeout", locAttr.defaultQueryTimeout(), rmtAttr.defaultQueryTimeout(), + false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize", - "Preload batch size", locAttr.preloadBatchSize(), rmtAttr.preloadBatchSize(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultTimeToLive", + "Default time to live", locAttr.defaultTimeToLive(), rmtAttr.defaultTimeToLive(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "swapEnabled", - "Swap enabled", locAttr.swapEnabled(), rmtAttr.swapEnabled(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize", + "Preload batch size", locAttr.preloadBatchSize(), rmtAttr.preloadBatchSize(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode", - "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(), - true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "swapEnabled", + "Swap enabled", locAttr.swapEnabled(), rmtAttr.swapEnabled(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize", - "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(), - false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode", + "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(), + true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled", - "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize", + "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(), + false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency", - "Write behind flush frequency", locAttr.writeBehindFlushFrequency(), - rmtAttr.writeBehindFlushFrequency(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled", + "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize", - "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(), - false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency", + "Write behind flush frequency", locAttr.writeBehindFlushFrequency(), + rmtAttr.writeBehindFlushFrequency(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount", - "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(), - rmtAttr.writeBehindFlushThreadCount(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize", + "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(), + false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictMaxOverflowRatio", - "Eviction max overflow ratio", locAttr.evictMaxOverflowRatio(), - rmtAttr.evictMaxOverflowRatio(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount", + "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(), + rmtAttr.writeBehindFlushThreadCount(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "indexingSpiName", "IndexingSpiName", - locAttr.indexingSpiName(), rmtAttr.indexingSpiName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictMaxOverflowRatio", + "Eviction max overflow ratio", locAttr.evictMaxOverflowRatio(), + rmtAttr.evictMaxOverflowRatio(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "queryIndexEnabled", - "Query index enabled", locAttr.queryIndexEnabled(), rmtAttr.queryIndexEnabled(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "indexingSpiName", "IndexingSpiName", + locAttr.indexingSpiName(), rmtAttr.indexingSpiName(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeValueBytes", - "Store value bytes", locAttr.storeValueBytes(), rmtAttr.storeValueBytes(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "queryIndexEnabled", + "Query index enabled", locAttr.queryIndexEnabled(), rmtAttr.queryIndexEnabled(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "queryIndexEnabled", - "Query index enabled", locAttr.queryIndexEnabled(), rmtAttr.queryIndexEnabled(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeValueBytes", + "Store value bytes", locAttr.storeValueBytes(), rmtAttr.storeValueBytes(), true); - if (locAttr.cacheMode() == PARTITIONED) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictSynchronized", - "Eviction synchronized", locAttr.evictSynchronized(), rmtAttr.evictSynchronized(), - true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "queryIndexEnabled", + "Query index enabled", locAttr.queryIndexEnabled(), rmtAttr.queryIndexEnabled(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictNearSynchronized", - "Eviction near synchronized", locAttr.evictNearSynchronized(), - rmtAttr.evictNearSynchronized(), true); + if (locAttr.cacheMode() == PARTITIONED) { + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictSynchronized", + "Eviction synchronized", locAttr.evictSynchronized(), rmtAttr.evictSynchronized(), + true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy", - "Near eviction policy", locAttr.nearEvictionPolicyClassName(), - rmtAttr.nearEvictionPolicyClassName(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictNearSynchronized", + "Eviction near synchronized", locAttr.evictNearSynchronized(), + rmtAttr.evictNearSynchronized(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors", - "Affinity include neighbors", locAttr.affinityIncludeNeighbors(), - rmtAttr.affinityIncludeNeighbors(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy", + "Near eviction policy", locAttr.nearEvictionPolicyClassName(), + rmtAttr.nearEvictionPolicyClassName(), false); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups", - "Affinity key backups", locAttr.affinityKeyBackups(), - rmtAttr.affinityKeyBackups(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors", + "Affinity include neighbors", locAttr.affinityIncludeNeighbors(), + rmtAttr.affinityIncludeNeighbors(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity.hashIdResolver", - "Partitioned cache affinity hash ID resolver class", - locAttr.affinityHashIdResolverClassName(), rmtAttr.affinityHashIdResolverClassName(), - true); - } - } - } + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups", + "Affinity key backups", locAttr.affinityKeyBackups(), + rmtAttr.affinityKeyBackups(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity.hashIdResolver", + "Partitioned cache affinity hash ID resolver class", + locAttr.affinityHashIdResolverClassName(), rmtAttr.affinityHashIdResolverClassName(), + true); } } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index c93fca7..6604ccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -609,42 +609,6 @@ public class GridCacheUtils { } /** - * Checks if given node has specified cache started and the local DHT storage is enabled. - * - * @param ctx Cache context. - * @param s Node shadow to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean affinityNode(GridCacheContext ctx, ClusterNode s) { - assert ctx != null; - assert s != null; - - GridCacheAttributes[] caches = s.attribute(ATTR_CACHE); - - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(ctx.namex(), attrs.cacheName())) - return attrs.isAffinityNode(); - - return false; - } - - /** - * Checks if given node contains configured cache with the name - * as described by given cache context. - * - * @param ctx Cache context. - * @param node Node to check. - * @return {@code true} if node contains required cache. - */ - public static boolean cacheNode(GridCacheContext ctx, ClusterNode node) { - assert ctx != null; - assert node != null; - - return U.hasCache(node, ctx.namex()); - } - - /** * Checks if near cache is enabled for cache context. * * @param ctx Cache context to check. @@ -757,11 +721,12 @@ public class GridCacheUtils { public static ClusterNode oldest(GridCacheSharedContext cctx, long topOrder) { ClusterNode oldest = null; - for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) + for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) { if (oldest == null || n.order() < oldest.order()) oldest = n; + } - assert oldest != null; + assert oldest != null : "Failed to find oldest node with caches: " + topOrder; assert oldest.order() <= topOrder || topOrder < 0; return oldest; @@ -1399,14 +1364,14 @@ public class GridCacheUtils { * @param log Logger used to log warning message (used only if fail flag is not set). * @param locCfg Local configuration. * @param rmtCfg Remote configuration. - * @param rmt Remote node. + * @param rmtNodeId Remote node. * @param attr Attribute name. * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning. * @throws IgniteCheckedException If attribute values are different and fail flag is true. */ public static void checkAttributeMismatch(IgniteLogger log, CacheConfiguration locCfg, - CacheConfiguration rmtCfg, ClusterNode rmt, T2<String, String> attr, boolean fail) throws IgniteCheckedException { - assert rmt != null; + CacheConfiguration rmtCfg, UUID rmtNodeId, T2<String, String> attr, boolean fail) throws IgniteCheckedException { + assert rmtNodeId != null; assert attr != null; assert attr.get1() != null; assert attr.get2() != null; @@ -1415,7 +1380,7 @@ public class GridCacheUtils { Object rmtVal = U.property(rmtCfg, attr.get1()); - checkAttributeMismatch(log, rmtCfg.getName(), rmt, attr.get1(), attr.get2(), locVal, rmtVal, fail); + checkAttributeMismatch(log, rmtCfg.getName(), rmtNodeId, attr.get1(), attr.get2(), locVal, rmtVal, fail); } /** @@ -1423,7 +1388,7 @@ public class GridCacheUtils { * * @param log Logger used to log warning message (used only if fail flag is not set). * @param cfgName Remote cache name. - * @param rmt Remote node. + * @param rmtNodeId Remote node. * @param attrName Short attribute name for error message. * @param attrMsg Full attribute name for error message. * @param locVal Local value. @@ -1431,9 +1396,9 @@ public class GridCacheUtils { * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning. * @throws IgniteCheckedException If attribute values are different and fail flag is true. */ - public static void checkAttributeMismatch(IgniteLogger log, String cfgName, ClusterNode rmt, String attrName, + public static void checkAttributeMismatch(IgniteLogger log, String cfgName, UUID rmtNodeId, String attrName, String attrMsg, @Nullable Object locVal, @Nullable Object rmtVal, boolean fail) throws IgniteCheckedException { - assert rmt != null; + assert rmtNodeId != null; assert attrName != null; assert attrMsg != null; @@ -1444,7 +1409,7 @@ public class GridCacheUtils { "system property) [cacheName=" + cfgName + ", local" + capitalize(attrName) + "=" + locVal + ", remote" + capitalize(attrName) + "=" + rmtVal + - ", rmtNodeId=" + rmt.id() + ']'); + ", rmtNodeId=" + rmtNodeId + ']'); } else { assert log != null; @@ -1453,7 +1418,7 @@ public class GridCacheUtils { "configuration) [cacheName=" + cfgName + ", local" + capitalize(attrName) + "=" + locVal + ", remote" + capitalize(attrName) + "=" + rmtVal + - ", rmtNodeId=" + rmt.id() + ']'); + ", rmtNodeId=" + rmtNodeId + ']'); } } } @@ -1523,6 +1488,7 @@ public class GridCacheUtils { cache.setSwapEnabled(false); cache.setQueryIndexEnabled(false); cache.setCacheStoreFactory(null); + cache.setNodeFilter(CacheConfiguration.ALL_NODES); cache.setEagerTtl(true); cache.setPreloadMode(SYNC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index ad06eb0..02f799f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -370,7 +370,7 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { } // If remote node has no near cache, don't add it. - if (!U.hasNearCache(node, cacheName())) { + if (!cctx.discovery().cacheNearNode(node, cacheName())) { if (log.isDebugEnabled()) log.debug("Ignoring near reader because near cache is disabled: " + nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 59a1cbf..2c2911b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -172,7 +172,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K, lock.readLock().lock(); try { - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + + ", cacheName=" + cctx.name() + ']'; return topVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3efdaca..ddf4229 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1085,7 +1085,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { checkClearForceTransformBackups(req, locked); - boolean hasNear = U.hasNearCache(node, name()); + boolean hasNear = ctx.discovery().cacheNearNode(node, name()); GridCacheVersion ver = req.updateVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5a66b21..e8a47ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -544,7 +544,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff if (log.isDebugEnabled()) log.debug("Initialized future: " + this); - if (!U.hasCaches(discoEvt.node())) + if (canSkipExchange()) onDone(exchId.topologyVersion()); else { // If this node is not oldest. @@ -567,6 +567,13 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff } /** + * @return {@code True} if no distributed exchange is needed. + */ + private boolean canSkipExchange() { + return false; // TODO ignite-45; + } + + /** * Starts dynamic caches. */ private void startCaches() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 0c81e5e..142b49a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -491,9 +491,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - CacheDistributionMode mode = U.distributionMode(n, cctx.name()); - - return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) && + return cctx.discovery().cacheAffinityNode(n, cctx.name()) && (prj == null || prj.node(n.id()) != null); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 52327c2..898ac0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1762,7 +1762,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // Remote nodes that have current cache. Collection<ClusterNode> nodes = F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return U.hasCache(n, space); + return cctx.kernalContext().discovery().cacheAffinityNode(n, space); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 9502b3f..bfbced7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -210,19 +209,20 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { try { ClusterNode node = ctx.discovery().node(nodeId); - if (ctx.config().isPeerClassLoadingEnabled() && node != null && - U.hasCache(node, cacheName)) { - evt.entry().p2pMarshal(ctx.config().getMarshaller()); + if (node != null) { + if (ctx.config().isPeerClassLoadingEnabled()) { + evt.entry().p2pMarshal(ctx.config().getMarshaller()); - evt.entry().cacheName(cacheName); + evt.entry().cacheName(cacheName); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); - depMgr.prepare(evt.entry()); - } + depMgr.prepare(evt.entry()); + } - ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + } } catch (IgniteCheckedException ex) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java index fee601b..bdc1b4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -44,13 +45,21 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b /** Marshaller. */ private static final Marshaller MARSHALLER = new JdkMarshaller(); + /** */ + @IgniteInstanceResource + private Ignite ignite; + /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable String cacheName) { Map<JdbcDriverMetadataJob, ClusterNode> map = new HashMap<>(); + IgniteKernal kernal = (IgniteKernal)ignite; + + GridDiscoveryManager discoMgr = kernal.context().discovery(); + for (ClusterNode n : subgrid) - if (U.hasCache(n, cacheName)) { + if (discoMgr.cacheAffinityNode(n, cacheName)) { map.put(new JdbcDriverMetadataJob(cacheName), n); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index 4c6f8e4..3b10235 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.lang.*; @@ -58,6 +59,9 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { /** Scheduler. */ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1); + @IgniteInstanceResource + private Ignite ignite; + /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, byte[] arg) { try { @@ -85,9 +89,12 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { else { String cache = (String)args.get("cache"); - for (ClusterNode n : subgrid) - if (U.hasCache(n, cache)) + GridDiscoveryManager discoMgr = ((IgniteKernal)ignite).context().discovery(); + + for (ClusterNode n : subgrid) { + if (discoMgr.cacheAffinityNode(n, cache)) return F.asMap(new JdbcDriverJob(args, first), n); + } throw new IgniteException("Can't find node with cache: " + cache); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcValidationTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcValidationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcValidationTask.java index 1751930..7d01682 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcValidationTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcValidationTask.java @@ -20,8 +20,9 @@ package org.apache.ignite.internal.processors.cache.query.jdbc; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.resources.*; import org.jetbrains.annotations.*; @@ -43,8 +44,10 @@ public class GridCacheQueryJdbcValidationTask extends ComputeTaskSplitAdapter<St private Ignite ignite; @Override public Object execute() { + GridDiscoveryManager discoMgr = ((IgniteKernal)ignite).context().discovery(); + for (ClusterNode n : ignite.cluster().nodes()) - if (U.hasCache(n, cacheName)) + if (discoMgr.cacheNode(n, cacheName)) return true; return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index d2a8a45..ecd82ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -6940,72 +6940,6 @@ public abstract class IgniteUtils { } /** - * Gets cache attributes from the given node for the given cache name. - * - * @param n Node. - * @param cacheName Cache name. - * @return Attributes. - */ - @Nullable public static GridCacheAttributes cacheAttributes(ClusterNode n, @Nullable String cacheName) { - for (GridCacheAttributes a : cacheAttributes(n)) { - if (F.eq(a.cacheName(), cacheName)) - return a; - } - - return null; - } - - /** - * Gets view on all cache names started on the node. - * - * @param n Node to get cache names for. - * @return Cache names for the node. - */ - public static Collection<String> cacheNames(ClusterNode n) { - return F.viewReadOnly( - F.asList(n.<GridCacheAttributes[]>attribute(ATTR_CACHE)), - new C1<GridCacheAttributes, String>() { - @Override public String apply(GridCacheAttributes attrs) { - return attrs.cacheName(); - } - }); - } - - /** - * Checks if given node has specified cache started. - * - * @param n Node to check. - * @param cacheName Cache name to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean hasCache(ClusterNode n, @Nullable String cacheName) { - assert n != null; - - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return true; - - return false; - } - - /** - * Checks if given node has at least one cache. - * - * @param n Node to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean hasCaches(ClusterNode n) { - assert n != null; - - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - return !F.isEmpty(caches); - } - - /** * Checks if given node has specified streamer started. * * @param n Node to check. @@ -7028,63 +6962,6 @@ public abstract class IgniteUtils { } /** - * Gets cache mode or a cache on given node or {@code null} if cache is not - * present on given node. - * - * @param n Node to check. - * @param cacheName Cache to check. - * @return Cache mode or {@code null} if cache is not found. - */ - @Nullable public static CacheMode cacheMode(ClusterNode n, String cacheName) { - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return attrs.cacheMode(); - - return null; - } - - /** - * Gets cache mode or a cache on given node or {@code null} if cache is not - * present on given node. - * - * @param n Node to check. - * @param cacheName Cache to check. - * @return Cache mode or {@code null} if cache is not found. - */ - @Nullable public static CacheAtomicityMode atomicityMode(ClusterNode n, String cacheName) { - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return attrs.atomicityMode(); - - return null; - } - - /** - * Gets cache distribution mode on given node or {@code null} if cache is not - * present on given node. - * - * @param n Node to check. - * @param cacheName Cache to check. - * @return Cache distribution mode or {@code null} if cache is not found. - */ - @Nullable public static CacheDistributionMode distributionMode(ClusterNode n, String cacheName) { - GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); - - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return attrs.partitionedTaxonomy(); - - return null; - } - - /** * Checks if given node has near cache enabled for the specified * partitioned cache. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 5b99f58..162e4e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -8271,36 +8271,6 @@ public class GridFunc { } /** - * Gets node predicate which returns {@code true} for all nodes which have given cache names - * started. - * - * @param cacheNames Cache names to get predicate for. Empty array means default cache name. If - * {@code null} array is passed, then {@link #alwaysFalse()} predicate will be returned. - * @return Predicate which returns {@code true} for all nodes which have given cache names - * started. - */ - public static IgnitePredicate<ClusterNode> cacheNodesForNames(@Nullable final String... cacheNames) { - if (cacheNames == null) - return alwaysFalse(); - - return new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - Collection<String> names = U.cacheNames(n); - - for (String name : names) { - if (name == null && cacheNames.length == 0) - return true; - - if (U.containsStringArray(cacheNames, name, false)) - return true; - } - - return false; - } - }; - } - - /** * Gets event predicate that returns {@code true} only if event type is one of the given. * Note that if array of provided types is {@code null} or empty this method returns * predicate that evaluates to {@code false} when applying. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index 311c36f..0fb638d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -138,7 +138,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac final GridStringLogger strLog = new GridStringLogger(false, log); - CU.checkAttributeMismatch(strLog, "cache", node, "cacheMode", "Cache mode", LOCAL, PARTITIONED, false); + CU.checkAttributeMismatch(strLog, "cache", node.id(), "cacheMode", "Cache mode", LOCAL, PARTITIONED, false); assertTrue("No expected message in log: " + strLog.toString(), strLog.toString().contains("Cache mode mismatch")); @@ -148,7 +148,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac GridTestUtils.assertThrows(log, new Callable<Void>() { /** {@inheritDoc} */ @Override public Void call() throws Exception { - CU.checkAttributeMismatch(strLog, "cache", node, "cacheMode", "Cache mode", LOCAL, PARTITIONED, true); + CU.checkAttributeMismatch(strLog, "cache", node.id(), "cacheMode", "Cache mode", LOCAL, PARTITIONED, true); return null; } }, IgniteCheckedException.class, "Cache mode mismatch"); @@ -161,7 +161,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac cfg2.setCacheMode(PARTITIONED); - CU.checkAttributeMismatch(strLog, cfg1, cfg2, node, new T2<>("cacheMode", "Cache mode"), false); + CU.checkAttributeMismatch(strLog, cfg1, cfg2, node.id(), new T2<>("cacheMode", "Cache mode"), false); assertTrue("No expected message in log: " + strLog.toString(), strLog.toString().contains("Cache mode mismatch")); @@ -169,7 +169,7 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac GridTestUtils.assertThrows(log, new Callable<Void>() { /** {@inheritDoc} */ @Override public Void call() throws Exception { - CU.checkAttributeMismatch(strLog, cfg1, cfg2, node, new T2<>("cacheMode", "Cache mode"), true); + CU.checkAttributeMismatch(strLog, cfg1, cfg2, node.id(), new T2<>("cacheMode", "Cache mode"), true); return null; } }, IgniteCheckedException.class, "Cache mode mismatch"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index d7769e9..7aa82f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -45,9 +45,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { private static final String TEST_ATTRIBUTE_NAME = "TEST_ATTRIBUTE_NAME"; /** */ - private boolean cache = true; - - /** */ public static final IgnitePredicate<ClusterNode> NODE_FILTER = new IgnitePredicate<ClusterNode>() { /** {@inheritDoc} */ @Override public boolean apply(ClusterNode n) { @@ -73,13 +70,11 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute)); - if (cache) { - CacheConfiguration cacheCfg = new CacheConfiguration(); + CacheConfiguration cacheCfg = new CacheConfiguration(); - cacheCfg.setName(STATIC_CACHE_NAME); + cacheCfg.setName(STATIC_CACHE_NAME); - cfg.setCacheConfiguration(cacheCfg); - } + cfg.setCacheConfiguration(cacheCfg); return cfg; } @@ -110,7 +105,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); + futs.add(kernal.context().cache().dynamicStartCache(ccfg)); return null; } @@ -169,7 +164,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); + futs.add(kernal.context().cache().dynamicStartCache(ccfg)); return null; } @@ -240,7 +235,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); + kernal.context().cache().dynamicStartCache(ccfg).get(); for (int g = 0; g < nodeCount(); g++) { IgniteKernal kernal0 = (IgniteKernal)grid(g); @@ -248,6 +243,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) f.get(); + info("Getting cache for node: " + g); + assertNotNull(grid(g).jcache(DYNAMIC_CACHE_NAME)); } @@ -297,7 +294,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); + kernal.context().cache().dynamicStartCache(ccfg).get(); + + info(">>>>>>> Deployed dynamic cache"); startGrid(nodeCount()); @@ -354,7 +353,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - kernal.context().cache().dynamicStartCache(ccfg, NODE_FILTER).get(); + ccfg.setNodeFilter(NODE_FILTER); + + kernal.context().cache().dynamicStartCache(ccfg).get(); startGrid(nodeCount() + 1); @@ -406,7 +407,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testFailWhenStaticCacheExists() throws Exception { + public void testFailWhenConfiguredCacheExists() throws Exception { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { final IgniteKernal kernal = (IgniteKernal)grid(0); @@ -417,36 +418,10 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { // Cache is already configured, should fail. ccfg.setName(STATIC_CACHE_NAME); - return kernal.context().cache().dynamicStartCache(ccfg, NODE_FILTER).get(); + ccfg.setNodeFilter(NODE_FILTER); + + return kernal.context().cache().dynamicStartCache(ccfg).get(); } }, IgniteCheckedException.class, null); } - - /** - * @throws Exception If failed. TODO Ignite-45. - */ - public void _testFailWhenStaticCacheExistsOnOtherNode() throws Exception { - cache = false; - - startGrid(nodeCount()); - - try { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - final IgniteKernal kernal = (IgniteKernal)grid(nodeCount()); - - CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - - // Cache is already configured, should fail. - ccfg.setName(STATIC_CACHE_NAME); - - return kernal.context().cache().dynamicStartCache(ccfg, NODE_FILTER).get(); - } - }, IgniteCheckedException.class, null); - } - finally { - stopGrid(nodeCount()); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6a5e48a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index 220d5d6..ed1fd9d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -166,6 +166,27 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg, + @Nullable NearCacheConfiguration<K, V> nearCfg) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(@Nullable NearCacheConfiguration<K, V> nearCfg) { + return null; + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) { + // No-op. + } + + /** {@inheritDoc} */ @Override public IgniteTransactions transactions() { return null; }