# ignite-709 include internal keys for scan query
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/581f4d99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/581f4d99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/581f4d99 Branch: refs/heads/ignite-960 Commit: 581f4d99fc1dd31ca83631f2ceabd7187f1572fe Parents: 9d3ab16 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 28 12:01:31 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 28 14:51:16 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 1 - .../processors/cache/GridCacheAdapter.java | 15 ++++++++- .../cache/GridCacheConcurrentMap.java | 21 ++++++++++--- .../processors/cache/GridCacheProcessor.java | 32 +++++++++++--------- .../processors/cache/GridCacheProxyImpl.java | 12 ++++++++ .../processors/cache/IgniteInternalCache.java | 5 +++ .../cache/query/GridCacheQueryAdapter.java | 2 ++ .../cache/query/GridCacheQueryErrorFuture.java | 2 ++ .../cache/query/GridCacheQueryManager.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 24 ++++++++++++--- .../cacheobject/IgniteCacheObjectProcessor.java | 5 ++- .../IgniteCacheObjectProcessorImpl.java | 2 +- 12 files changed, 93 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 5a03ed8..3f5d7b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index bbd13f1..a8bf1f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -902,7 +902,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[]) null); + return keySet((CacheEntryPredicate[])null); + } + + /** {@inheritDoc} */ + @Override public Set<K> keySetx() { + return keySetx((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -4299,6 +4304,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * @param filter Filters to evaluate. + * @return Key set including internal keys. + */ + public Set<K> keySetx(@Nullable CacheEntryPredicate... filter) { + return map.keySetx(filter); + } + + /** * @param filter Primary key set. * @return Primary key set. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index bd3e0f2..db5eed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -626,7 +626,19 @@ public class GridCacheConcurrentMap { public <K, V> Set<K> keySet(CacheEntryPredicate... filter) { checkWeakQueue(); - return new KeySet<>(this, filter); + return new KeySet<>(this, filter, false); + } + + /** + * Key set including internal keys. + * + * @param filter Filter. + * @return Set of the keys contained in this map. + */ + public <K, V> Set<K> keySetx(CacheEntryPredicate... filter) { + checkWeakQueue(); + + return new KeySet<>(this, filter, true); } /** @@ -1921,7 +1933,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(new KeySet<K, V>(map, filter)); + ctx.cache().clearLocally0(new KeySet<K, V>(map, filter, false)); } /** {@inheritDoc} */ @@ -2171,11 +2183,12 @@ public class GridCacheConcurrentMap { /** * @param map Base map. * @param filter Key filter. + * @param internal Whether to allow internal keys. */ - private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { + private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter, boolean internal) { assert map != null; - set = new Set0<>(map, nonInternal(filter)); + set = new Set0<>(map, internal ? filter : nonInternal(filter)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d9a7755..3065a2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -666,8 +665,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - List<GridCacheAdapter<?, ?>> locCaches = new ArrayList<>(registeredCaches.size()); - try { if (ctx.config().isDaemon()) return; @@ -735,9 +732,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cache); jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); - - if (loc) - locCaches.add(cache); } } } @@ -785,21 +779,31 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter<?, ?> cache : caches.values()) onKernalStart(cache); + boolean utilityCacheStarted = false; + // Wait for caches in SYNC preload mode. - for (GridCacheAdapter<?, ?> cache : locCaches) { - CacheConfiguration cfg = cache.configuration(); + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); + + if (cache != null) { + if (cfg.getRebalanceMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { + cache.preloader().syncFuture().get(); + + if (CU.isUtilityCache(cache.name())) { + ctx.cacheObjects().onUtilityCacheStarted(); - if (cfg.getRebalanceMode() == SYNC) { - if (cfg.getCacheMode() == REPLICATED || - (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); + utilityCacheStarted = true; + } + } + } } } - ctx.cacheObjects().onCacheProcessorStarted(); - assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started"; assert caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; + assert utilityCacheStarted; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 55d2f84..9a6d08a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -741,6 +741,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public Set<K> keySetx() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.keySetx(); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 5184115..ccce1b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -775,6 +775,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public Set<K> keySet(); /** + * @return Set of keys including internal keys. + */ + public Set<K> keySetx(); + + /** * Set of keys for which this node is primary. * This set is dynamic and may change with grid topology changes. * Note that this set will contain mappings for all keys, even if their values are http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 7e3fb26..fab490f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -376,10 +376,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return execute(null, rmtTransform, args); } + /** {@inheritDoc} */ @Override public QueryMetrics metrics() { return metrics.copy(); } + /** {@inheritDoc} */ @Override public void resetMetrics() { metrics = new GridCacheQueryMetricsAdapter(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java index 2999e7b..15eb368 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java @@ -43,6 +43,8 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection< /** {@inheritDoc} */ @Nullable @Override public T next() throws IgniteCheckedException { + get(); + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16a8028..32e9d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -773,7 +773,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); { advance(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 97fd7f3..6277c5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -250,8 +250,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeQuery(CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter, - int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException { + public UUID executeQuery(CacheEntryUpdatedListener locLsnr, + CacheEntryEventSerializableFilter rmtFilter, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + ClusterGroup grp) throws IgniteCheckedException + { return executeQuery0( locLsnr, rmtFilter, @@ -357,9 +362,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, final CacheEntryEventSerializableFilter rmtFilter, - int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, - boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { + private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + final CacheEntryEventSerializableFilter rmtFilter, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + ClusterGroup grp) throws IgniteCheckedException + { cctx.checkSecurity(SecurityPermission.CACHE_READ); if (grp == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index a04692d..f8e5a60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -18,12 +18,10 @@ package org.apache.ignite.internal.processors.cacheobject; import org.apache.ignite.*; -import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -32,8 +30,9 @@ import org.jetbrains.annotations.*; public interface IgniteCacheObjectProcessor extends GridProcessor { /** * @see GridComponent#onKernalStart() + * @throws IgniteCheckedException If failed. */ - public void onCacheProcessorStarted(); + public void onUtilityCacheStarted() throws IgniteCheckedException; /** * @param typeName Type name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/581f4d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index fe5a356..45fc121 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -208,7 +208,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public void onCacheProcessorStarted() { + @Override public void onUtilityCacheStarted() throws IgniteCheckedException { // No-op. }