# IGNITE-45 - Fixing query tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/910cdda4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/910cdda4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/910cdda4 Branch: refs/heads/ignite-45 Commit: 910cdda4e415ccce84cdf0a97d7e9114d1983ab5 Parents: 0003d70 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri Mar 13 17:43:13 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri Mar 13 17:43:13 2015 -0700 ---------------------------------------------------------------------- .../eviction/fifo/CacheFifoEvictionPolicy.java | 13 +++- .../eviction/igfs/CacheIgfsEvictionFilter.java | 3 +- .../CacheIgfsPerBlockLruEvictionPolicy.java | 19 ++++- .../eviction/lru/CacheLruEvictionPolicy.java | 13 +++- .../random/CacheRandomEvictionPolicy.java | 13 +++- .../internal/GridEventConsumeHandler.java | 9 ++- .../internal/GridMessageListenHandler.java | 9 ++- .../processors/cache/GridCacheProcessor.java | 60 ++++++++------- .../distributed/near/GridNearCacheAdapter.java | 14 +++- .../cache/query/GridCacheQueryManager.java | 16 ++-- .../continuous/CacheContinuousQueryHandler.java | 22 +++++- .../continuous/CacheContinuousQueryManager.java | 7 +- .../continuous/GridContinuousHandler.java | 15 +++- .../continuous/GridContinuousProcessor.java | 77 +++++++++++++++++++- .../processors/query/GridQueryProcessor.java | 4 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 41 +++++++---- .../GridCacheAbstractFieldsQuerySelfTest.java | 4 +- .../cache/GridCacheAbstractQuerySelfTest.java | 63 +++++++++------- .../GridCacheReplicatedQuerySelfTest.java | 5 +- 19 files changed, 303 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java index b1162d7..5cfdd88 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jdk8.backport.ConcurrentLinkedDeque8.*; +import java.io.*; import java.util.*; /** @@ -32,7 +33,7 @@ import java.util.*; * maintained by attaching ordering metadata to cache entries. */ public class CacheFifoEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, - CacheFifoEvictionPolicyMBean { + CacheFifoEvictionPolicyMBean, Externalizable { /** Maximum size. */ private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; @@ -172,6 +173,16 @@ public class CacheFifoEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, } /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(max); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + max = in.readInt(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheFifoEvictionPolicy.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsEvictionFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsEvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsEvictionFilter.java index d8b16c6..f09a1b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsEvictionFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsEvictionFilter.java @@ -21,11 +21,12 @@ import org.apache.ignite.cache.eviction.*; import org.apache.ignite.internal.processors.igfs.*; import javax.cache.*; +import java.io.*; /** * IGFS eviction filter which will not evict blocks of particular files. */ -public class CacheIgfsEvictionFilter implements CacheEvictionFilter { +public class CacheIgfsEvictionFilter implements CacheEvictionFilter, Serializable { /** {@inheritDoc} */ @Override public boolean evictAllowed(Cache.Entry entry) { Object key = entry.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsPerBlockLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsPerBlockLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsPerBlockLruEvictionPolicy.java index d58fb4a..cd4009a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsPerBlockLruEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/igfs/CacheIgfsPerBlockLruEvictionPolicy.java @@ -26,6 +26,7 @@ import org.jdk8.backport.*; import org.jdk8.backport.ConcurrentLinkedDeque8.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; import java.util.regex.*; @@ -34,7 +35,7 @@ import java.util.regex.*; * IGFS eviction policy which evicts particular blocks. */ public class CacheIgfsPerBlockLruEvictionPolicy implements CacheEvictionPolicy<IgfsBlockKey, byte[]>, - CacheIgfsPerBlockLruEvictionPolicyMXBean { + CacheIgfsPerBlockLruEvictionPolicyMXBean, Externalizable { /** Maximum size. When reached, eviction begins. */ private volatile long maxSize; @@ -269,6 +270,22 @@ public class CacheIgfsPerBlockLruEvictionPolicy implements CacheEvictionPolicy<I return queue.size(); } + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(maxSize); + out.writeInt(maxBlocks); + out.writeObject(excludePaths); + out.writeObject(excludePatterns); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + maxSize = in.readLong(); + maxBlocks = in.readInt(); + excludePaths = (Collection<String>)in.readObject(); + excludePatterns = (Collection<Pattern>)in.readObject(); + } + /** * Check whether provided path must be excluded from evictions. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java index 3d67178..188eb2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jdk8.backport.ConcurrentLinkedDeque8.*; +import java.io.*; import java.util.*; /** @@ -32,7 +33,7 @@ import java.util.*; * information is maintained by attaching ordering metadata to cache entries. */ public class CacheLruEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, - CacheLruEvictionPolicyMBean { + CacheLruEvictionPolicyMBean, Externalizable { /** Maximum size. */ private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; @@ -179,6 +180,16 @@ public class CacheLruEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, } /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(max); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + max = in.readInt(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheLruEvictionPolicy.class, this, "size", queue.sizex()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java index 2a9fb9e..83f2e6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java @@ -23,6 +23,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; import javax.cache.*; +import java.io.*; /** * Cache eviction policy which will select random cache entry for eviction if cache @@ -34,7 +35,7 @@ import javax.cache.*; * key has the same probability of being accessed. */ public class CacheRandomEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, - CacheRandomEvictionPolicyMBean { + CacheRandomEvictionPolicyMBean, Externalizable { /** Maximum size. */ private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; @@ -95,6 +96,16 @@ public class CacheRandomEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V } /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(max); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + max = in.readInt(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheRandomEvictionPolicy.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index f7fdad5..c60646e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -107,7 +107,12 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) + @Override public String cacheName() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; @@ -168,7 +173,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { ctx.event().addLocalEventListener(lsnr, types); - return true; + return RegisterStatus.REGISTERED; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 6412b63..4bfb57b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -90,10 +90,15 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public boolean register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { + @Override public String cacheName() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); - return true; + return RegisterStatus.REGISTERED; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 402a784..38bb5a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -123,8 +123,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { public GridCacheProcessor(GridKernalContext ctx) { super(ctx); - caches = new LinkedHashMap<>(); - jCacheProxies = new HashMap<>(); + caches = new ConcurrentHashMap<>(); + jCacheProxies = new ConcurrentHashMap<>(); preloadFuts = new TreeMap<>(); sysCaches = new HashSet<>(); @@ -598,7 +598,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfgs[i] = cfg; // Replace original configuration value. - if (caches.containsKey(cfg.getName())) { + if (caches.containsKey(maskNull(cfg.getName()))) { String cacheName = cfg.getName(); if (cacheName != null) @@ -634,7 +634,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { GridCacheAdapter cache = e.getValue(); - jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); + jCacheProxies.put(maskNull(e.getKey()), new IgniteCacheProxy(cache.context(), cache, null, false)); } transactions = new IgniteTransactionsImpl(sharedCtx); @@ -685,11 +685,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { String name = ccfg.getName(); - caches.put(name, cache); + caches.put(maskNull(name), cache); startCache(cache); - jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); } } @@ -753,7 +753,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; for (String cacheName : stopSeq) { - GridCacheAdapter<?, ?> cache = caches.get(cacheName); + GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName)); if (cache != null) stopCache(cache, cancel); @@ -780,7 +780,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; for (String cacheName : stopSeq) { - GridCacheAdapter<?, ?> cache = caches.get(cacheName); + GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName)); if (cache != null) onKernalStop(cache, cancel); @@ -804,6 +804,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { GridCacheContext<?, ?> cacheCtx = cache.context(); + ctx.query().onCacheStart(cacheCtx); + ctx.continuous().onCacheStart(cacheCtx); + CacheConfiguration cfg = cacheCtx.config(); // Start managers. @@ -877,7 +880,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel); } - ctx.kernalContext().query().onCacheStopped(cache.context()); + ctx.kernalContext().query().onCacheStop(ctx); + ctx.kernalContext().continuous().onCacheStop(ctx); U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(), ctx.store().configuredStore())); @@ -1196,8 +1200,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheCtx.cache(dht); } - ctx.query().onCacheStarted(ret); - return ret; } @@ -1270,7 +1272,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cacheCtx.cache()); onKernalStart(cacheCtx.cache()); - caches.put(cacheCtx.name(), cacheCtx.cache()); + caches.put(maskNull(cacheCtx.name()), cacheCtx.cache()); } } else if (req.isClientStart()) { @@ -1292,7 +1294,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cacheCtx.cache()); onKernalStart(cacheCtx.cache()); - caches.put(cacheCtx.name(), cacheCtx.cache()); + caches.put(maskNull(cacheCtx.name()), cacheCtx.cache()); } } } @@ -1304,7 +1306,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert req.isStop(); // Break the proxy before exchange future is done. - IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName()); + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); if (proxy != null) proxy.gate().onStopped(); @@ -1316,7 +1318,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void prepareCacheStop(DynamicCacheChangeRequest req) { assert req.isStop(); - GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName()); + GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); if (cache != null) { GridCacheContext<?, ?> ctx = cache.context(); @@ -1338,24 +1340,24 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public void onExchangeDone(DynamicCacheChangeRequest req) { + String masked = maskNull(req.cacheName()); + if (req.isStart() || req.isClientStart()) { - GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); + GridCacheAdapter<?, ?> cache = caches.get(masked); if (cache != null) - jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); } else { prepareCacheStop(req); - String masked = maskNull(req.cacheName()); - DynamicCacheDescriptor desc = registeredCaches.get(masked); if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) registeredCaches.remove(masked, desc); } - DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(masked); assert req.deploymentId() != null; assert fut == null || fut.deploymentId != null; @@ -1918,7 +1920,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * it is called from synchronization block within Swap SPI. */ - GridCacheAdapter cache = caches.get(CU.cacheNameForSwapSpaceName(spaceName)); + GridCacheAdapter cache = caches.get(maskNull(CU.cacheNameForSwapSpaceName(spaceName))); assert cache != null : "Failed to resolve cache name for swap space name: " + spaceName; @@ -1994,7 +1996,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting cache for name: " + name); - IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); return jcache == null ? null : jcache.legacyProxy(); } @@ -2069,7 +2071,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (sysCaches.contains(name)) throw new IllegalStateException("Failed to get cache because it is system cache: " + name); - IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); if (jcache == null) throw new IllegalArgumentException("Cache is not configured: " + name); @@ -2092,10 +2094,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to get cache because it is system cache: " + name); try { - IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(name); + String masked = maskNull(name); + + IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked); if (cache == null) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); + DynamicCacheDescriptor desc = registeredCaches.get(masked); if (desc == null || desc.cancelled()) throw new IllegalArgumentException("Cache is not started: " + name); @@ -2107,7 +2111,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { F.first(initiateCacheChanges(F.asList(req))).get(); - cache = (IgniteCache<K, V>)jCacheProxies.get(name); + cache = (IgniteCache<K, V>)jCacheProxies.get(masked); if (cache == null) throw new IllegalArgumentException("Cache is not started: " + name); @@ -2126,7 +2130,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) { - IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(maskNull(name)); if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); @@ -2174,7 +2178,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting internal cache adapter: " + name); - return (GridCacheAdapter<K, V>)caches.get(name); + return (GridCacheAdapter<K, V>)caches.get(maskNull(name)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 933fad4..a98149e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -303,7 +303,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public int size() { - return super.size() + dht().size(); + return nearEntries().size() + dht().size(); } /** {@inheritDoc} */ @@ -313,14 +313,22 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public int nearSize() { - return super.size(); + return nearEntries().size(); } /** * @return Near entries. */ public Set<Cache.Entry<K, V>> nearEntries() { - return super.entrySet(CU.empty0()); + final AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + + return super.entrySet(new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx entry) { + GridNearCacheEntry nearEntry = (GridNearCacheEntry)entry; + + return nearEntry.valid(topVer); + } + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index e829c87..acffd2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -590,15 +590,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte T2<String, List<Object>> resKey = null; - if (qry.type() == SQL_FIELDS) { - if (qry.clause() == null) { - assert !loc; + if (qry.clause() == null) { + assert !loc; - throw new IgniteCheckedException("Received next page request after iterator was removed. " + - "Consider increasing maximum number of stored iterators (see " + - "GridCacheConfiguration.getMaximumQueryIteratorCount() configuration property)."); - } + throw new IgniteCheckedException("Received next page request after iterator was removed. " + + "Consider increasing maximum number of stored iterators (see " + + "GridCacheConfiguration.getMaximumQueryIteratorCount() configuration property)."); + } + if (qry.type() == SQL_FIELDS) { if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( cctx.localNode(), @@ -629,7 +629,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte resKey = null; // Failed to cache result. } else { - assert qry.type() == SPI; + assert qry.type() == SPI : "Unexpected query type: " + qry.type(); if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index cec8820..cbfbecf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -145,7 +145,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) + @Override public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; @@ -268,7 +273,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } }; - return manager(ctx).registerListener(routineId, lsnr, internal); + CacheContinuousQueryManager mgr = manager(ctx); + + if (mgr == null) + return RegisterStatus.DELAYED; + + return mgr.registerListener(routineId, lsnr, internal); } /** {@inheritDoc} */ @@ -292,7 +302,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @return Continuous query manager. */ private CacheContinuousQueryManager manager(GridKernalContext ctx) { - return cacheContext(ctx).continuousQueries(); + GridCacheContext<K, V> cacheCtx = cacheContext(ctx); + + return cacheCtx == null ? null : cacheCtx.continuousQueries(); } /** {@inheritDoc} */ @@ -426,7 +438,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { assert ctx != null; - return ctx.cache().<K, V>internalCache(cacheName).context(); + GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); + + return cache == null ? null : cache.context(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 15402ce..d750240 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -38,7 +38,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static javax.cache.event.EventType.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridTopic.*; @@ -394,7 +393,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? cctx.kernalContext().job().currentTaskNameHash() : 0; - GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + GridContinuousHandler hnd = new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), locLsnr, @@ -473,7 +472,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param internal Internal flag. * @return Whether listener was actually registered. */ - boolean registerListener(UUID lsnrId, + GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, CacheContinuousQueryListener lsnr, boolean internal) { boolean added; @@ -494,7 +493,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } - return added; + return added ? GridContinuousHandler.RegisterStatus.REGISTERED : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 69639c0..ce9b7c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -27,8 +27,16 @@ import java.util.*; /** * Continuous routine handler. */ +@SuppressWarnings("PublicInnerClass") public interface GridContinuousHandler extends Externalizable, Cloneable { /** + * Listener registration status. + */ + public enum RegisterStatus { + REGISTERED, NOT_REGISTERED, DELAYED + } + + /** * Registers listener. * * @param nodeId ID of the node that started routine. @@ -37,7 +45,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @return Whether listener was actually registered. * @throws IgniteCheckedException In case of error. */ - public boolean register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; + public RegisterStatus register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; /** * Callback called after listener is registered and acknowledgement is sent. @@ -109,4 +117,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @return {@code True} if for continuous queries. */ public boolean isForQuery(); + + /** + * @return Cache name if this is a continuous query handler. + */ + public String cacheName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1a177ad..78fd63c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -369,6 +370,45 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * Callback invoked when cache is started. + * + * @param ctx Cache context. + */ + public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { + for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) { + UUID routineId = entry.getKey(); + RemoteRoutineInfo rmtInfo = entry.getValue(); + + GridContinuousHandler hnd = rmtInfo.hnd; + + if (hnd.isForQuery() && F.eq(ctx.name(), hnd.cacheName()) && rmtInfo.clearDelayedRegister()) { + GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx); + + assert status != GridContinuousHandler.RegisterStatus.DELAYED; + + if (status == GridContinuousHandler.RegisterStatus.REGISTERED) + hnd.onListenerRegistered(routineId, this.ctx); + } + } + } + + /** + * @param ctx Callback invoked when cache is stopped. + */ + public void onCacheStop(GridCacheContext ctx) { + Iterator<Map.Entry<UUID, RemoteRoutineInfo>> it = rmtInfos.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<UUID, RemoteRoutineInfo> entry = it.next(); + + GridContinuousHandler hnd = entry.getValue().hnd; + + if (hnd.isForQuery() && F.eq(ctx.name(), hnd.cacheName())) + it.remove(); + } + } + + /** * @param hnd Handler. * @param bufSize Buffer size. * @param interval Time interval. @@ -1044,7 +1084,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { checker.start(); } - return hnd.register(nodeId, routineId, ctx); + GridContinuousHandler.RegisterStatus status = hnd.register(nodeId, routineId, ctx); + + if (status == GridContinuousHandler.RegisterStatus.DELAYED) { + info.markDelayedRegister(); + + return false; + } + else + return status == GridContinuousHandler.RegisterStatus.REGISTERED; } return false; @@ -1250,6 +1298,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Automatic unsubscribe flag. */ private boolean autoUnsubscribe; + /** Delayed register flag. */ + private boolean delayedRegister; + /** * @param nodeId Master node ID. * @param hnd Continuous routine handler. @@ -1274,6 +1325,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * Marks info to be registered when cache is started. + */ + public void markDelayedRegister() { + assert hnd.isForQuery(); + + delayedRegister = true; + } + + /** + * Clears delayed register flag if it was set. + * + * @return {@code True} if flag was cleared. + */ + public boolean clearDelayedRegister() { + if (delayedRegister) { + delayedRegister = false; + + return true; + } + + return false; + } + + /** * @param obj Object to add. * @return Object to send or {@code null} if there is nothing to send for now. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 90cb4ae..5d1dbbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -144,7 +144,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param cctx Cache context. * @throws IgniteCheckedException If failed. */ - public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + public void onCacheStart(GridCacheContext cctx) throws IgniteCheckedException { if (idx == null) return; @@ -162,7 +162,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. */ - public void onCacheStopped(GridCacheContext cctx) { + public void onCacheStop(GridCacheContext cctx) { if (idx == null) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 4a0e69f..b39daab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -80,7 +80,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo cacheCfg.setNearConfiguration(nearConfiguration()); cacheCfg.setPreloadMode(ASYNC); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cacheCfg.setCacheStoreFactory(new StoreFactory()); cacheCfg.setReadThrough(true); cacheCfg.setWriteThrough(true); cacheCfg.setLoadPreviousValue(true); @@ -142,7 +142,11 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo for (int i = 0; i < gridCount(); i++) { for (int j = 0; j < 5; j++) { try { - ((IgniteKernal)grid(i)).cache(null).removeAll(); + GridCache<Object, Object> cache = ((IgniteKernal)grid(i)).cache(null); + + for (Cache.Entry<Object, Object> entry : cache.localEntries(new CachePeekMode[] {CachePeekMode.ALL})) { + cache.remove(entry.getKey()); + } break; } @@ -159,8 +163,9 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } for (int i = 0; i < gridCount(); i++) - assertEquals("Cache is not empty: " + ((IgniteKernal)grid(i)).cache(null).entrySet(), 0, - ((IgniteKernal)grid(i)).cache(null).size()); + assertEquals("Cache is not empty [entrySet=" + ((IgniteKernal)grid(i)).cache(null).entrySet() + + ", i=" + i + ']', + 0, ((IgniteKernal)grid(i)).cache(null).size()); for (int i = 0; i < gridCount(); i++) { @@ -800,34 +805,29 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Collection<CacheEntryEvent<? extends Integer, ? extends Integer>> all = new ConcurrentLinkedDeque8<>(); - final CountDownLatch latch = new CountDownLatch(2); + final CountDownLatch latch = new CountDownLatch(30); qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - int size = 0; - - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) all.add(evt); - size++; - } - - assertEquals(1, size); - latch.countDown(); } }); try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { - cache.put(1, 1); + cache.put(0, 0); startGrid("anotherGrid"); - cache.put(2, 2); + for (int i = 1; i < 30; i++) { + cache.put(i, i); + } assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all; - assertEquals(2, all.size()); + assertEquals(30, all.size()); } finally { stopGrid("anotherGrid"); @@ -930,6 +930,15 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } /** + * + */ + private static class StoreFactory implements Factory<CacheStore> { + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** * Store. */ private static class TestStore extends CacheStoreAdapter<Object, Object> { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java index de1ac5c..8a8ca47 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java @@ -73,8 +73,10 @@ public abstract class GridCacheAbstractFieldsQuerySelfTest extends GridCommonAbs if (hasCache) cfg.setCacheConfiguration(cache(null, null), cache(CACHE, null), cache(EMPTY_CACHE, null)); - else + else { + cfg.setClientMode(true); cfg.setCacheConfiguration(); + } cfg.setDiscoverySpi(discovery()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractQuerySelfTest.java index 618e820..e459344 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractQuerySelfTest.java @@ -117,41 +117,43 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT c.setMarshaller(new OptimizedMarshaller(false)); - CacheConfiguration[] ccs = new CacheConfiguration[2]; + if (!gridName.startsWith("client")) { + CacheConfiguration[] ccs = new CacheConfiguration[2]; - for (int i = 0; i < ccs.length; i++) { - CacheConfiguration cc = defaultCacheConfiguration(); + for (int i = 0; i < ccs.length; i++) { + CacheConfiguration cc = defaultCacheConfiguration(); - if (i > 0) - cc.setName("c" + i); + if (i > 0) + cc.setName("c" + i); - cc.setCacheMode(cacheMode()); - cc.setAtomicityMode(atomicityMode()); - // TODO IGNITE-45 -// cc.setDistributionMode(gridName.startsWith("client") ? CLIENT_ONLY : distributionMode()); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - cc.setPreloadMode(SYNC); - cc.setSwapEnabled(true); + cc.setCacheMode(cacheMode()); + cc.setAtomicityMode(atomicityMode()); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setCacheStoreFactory(new StoreFactory()); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + cc.setPreloadMode(SYNC); + cc.setSwapEnabled(true); - CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); + CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); - qcfg.setIndexPrimitiveKey(true); - qcfg.setIndexFixedTyping(true); + qcfg.setIndexPrimitiveKey(true); + qcfg.setIndexFixedTyping(true); - cc.setQueryConfiguration(qcfg); + cc.setQueryConfiguration(qcfg); - // Explicitly set number of backups equal to number of grids. - if (cacheMode() == CacheMode.PARTITIONED) - cc.setBackups(gridCount()); + // Explicitly set number of backups equal to number of grids. + if (cacheMode() == CacheMode.PARTITIONED) + cc.setBackups(gridCount()); - ccs[i] = cc; - } + ccs[i] = cc; + } - c.setCacheConfiguration(ccs); + c.setCacheConfiguration(ccs); + } + else + c.setClientMode(true); return c; } @@ -2055,4 +2057,13 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT return sum; } } + + /** + * + */ + private static class StoreFactory implements Factory<CacheStore> { + @Override public CacheStore create() { + return store; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/910cdda4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java index 73aa73a..73bad92 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java @@ -112,10 +112,13 @@ public class GridCacheReplicatedQuerySelfTest extends GridCacheAbstractQuerySelf try { Ignite g = startGrid("client"); + // Create client cache. + g.jcache(null); + GridCache<Integer, Integer> c = ((IgniteKernal)g).cache(null); for (int i = 0; i < 10; i++) - c.putx(i, i); + c.put(i, i); // Client cache should be empty. assertEquals(0, c.size());