ignite-921 Create scan query able to iterate over single partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f4cc4b6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f4cc4b6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f4cc4b6c Branch: refs/heads/ignite-389 Commit: f4cc4b6cecf0b521edcd8f07a10de8202832932d Parents: edf6ffc Author: agura <ag...@gridgain.com> Authored: Mon May 25 20:26:04 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Mon May 25 22:09:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 15 +- .../processors/cache/GridCacheSwapManager.java | 55 ++++- .../processors/cache/IgniteCacheProxy.java | 2 +- .../distributed/dht/GridDhtLocalPartition.java | 7 + .../processors/cache/query/CacheQuery.java | 2 +- .../query/GridCacheDistributedQueryManager.java | 3 + .../cache/query/GridCacheQueryAdapter.java | 28 ++- .../cache/query/GridCacheQueryManager.java | 200 ++++++++++++------- .../cache/query/GridCacheQueryRequest.java | 31 ++- .../datastructures/GridCacheSetImpl.java | 4 +- ...achePartitionedPreloadLifecycleSelfTest.java | 2 +- ...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +- .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++++--- .../cache/IgniteCacheAbstractQuerySelfTest.java | 53 ++++- 14 files changed, 384 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d390037..d7cec9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>(); - IgniteInternalFuture<Object> readFut = - readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() { + IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null, + subjId, taskName, new CI2<KeyCacheObject, Object>() { /** Version for all loaded entries. */ private GridCacheVersion nextVer = ctx.versions().next(); @@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Optional filter. * @return Put operation future. */ - public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) { + public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException { + @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) + throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1) return localIteratorHonorExpirePolicy(opCtx); - CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable()) + CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable()) .keepAll(false) .execute(); @@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return t; } - catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) { + catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | + IgniteTxRollbackCheckedException e) { throw e; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index eb82218..e4b1cbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); if (offHeapEnabled() && !swapEnabled()) - return rawOffHeapIterator(true, true); + return rawOffHeapIterator(null, true, true); if (swapEnabled() && !offHeapEnabled()) return rawSwapIterator(true, true); @@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private Map.Entry<byte[], byte[]> cur; { - it = rawOffHeapIterator(true, true); + it = rawOffHeapIterator(null, true, true); advance(); } @@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param c Key/value closure. + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Off-heap iterator. */ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, + Integer part, boolean primary, boolean backup) { @@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<T, T>(parts) { @Override protected GridCloseableIterator<T> partitionIterator(int part) - throws IgniteCheckedException - { + throws IgniteCheckedException { return offheap.iterator(spaceName, c, part); } }; } /** + * + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Raw off-heap iterator. */ - public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary, + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part, + final boolean primary, final boolean backup) { if (!offheapEnabled || (!primary && !backup)) @@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) { private Map.Entry<byte[], byte[]> cur; @@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param part Partition. + * @return Raw off-heap iterator. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part) + throws IgniteCheckedException + { + if (!swapEnabled) + return new GridEmptyCloseableIterator<>(); + + checkIteratorQueue(); + + return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>( + Collections.singleton(part)) { + @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException + { + return swapMgr.rawIterator(spaceName, part); + } + }; + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 2de5bf0..8833232 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (filter instanceof ScanQuery) { IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); - qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable); + qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable); if (grp != null) qry.projection(grp); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 0749f66..8ac3809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> } /** + * @return Keys belonging to partition. + */ + public Set<KeyCacheObject> keySet() { + return map.keySet(); + } + + /** * @return Entries belonging to partition. */ public Collection<GridDhtCacheEntry> entries() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 0658828..2d2db1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -76,7 +76,7 @@ import org.jetbrains.annotations.*; * </li> * <li> * Joins will work correctly only if joined objects are stored in - * collocated mode or at least one side of the join is stored in + * colocated mode or at least one side of the join is stored in * {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to * {@link AffinityKey} javadoc for more information about colocation. * </li> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index a579aab..2b93144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage false, null, req.keyValueFilter(), + req.partition(), req.className(), req.clause(), req.includeMetaData(), @@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), clsName, qry.query().scanFilter(), + qry.query().partition(), qry.reducer(), qry.transform(), qry.query().pageSize(), @@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), null, null, + null, qry.reducer(), qry.transform(), qry.query().pageSize(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 4b1fc87..d976d2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -56,6 +56,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private final IgniteBiPredicate<Object, Object> filter; + /** Partition. */ + private Integer part; + /** */ private final boolean incMeta; @@ -95,6 +98,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param clsName Class name. * @param clause Clause. * @param filter Scan filter. + * @param part Partition. * @param incMeta Include metadata flag. * @param keepPortable Keep portable flag. */ @@ -103,6 +107,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, boolean incMeta, boolean keepPortable) { assert cctx != null; @@ -113,6 +118,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.clsName = clsName; this.clause = clause; this.filter = filter; + this.part = part; this.incMeta = incMeta; this.keepPortable = keepPortable; @@ -132,6 +138,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param dedup Enable dedup flag. * @param prj Grid projection. * @param filter Key-value filter. + * @param part Partition. * @param clsName Class name. * @param clause Clause. * @param incMeta Include metadata flag. @@ -149,6 +156,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { boolean dedup, ClusterGroup prj, IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, @Nullable String clsName, String clause, boolean incMeta, @@ -165,6 +173,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.dedup = dedup; this.prj = prj; this.filter = filter; + this.part = part; this.clsName = clsName; this.clause = clause; this.incMeta = incMeta; @@ -334,6 +343,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** + * @return Partition. + */ + @Nullable public Integer partition() { + return part; + } + + /** * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { @@ -448,14 +464,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { case REPLICATED: if (prj != null) - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); return cctx.affinityNode() ? Collections.singletonList(cctx.localNode()) : - Collections.singletonList(F.rand(nodes(cctx, null))); + Collections.singletonList(F.rand(nodes(cctx, null, partition()))); case PARTITIONED: - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); default: throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); @@ -467,13 +483,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param prj Projection (optional). * @return Collection of data nodes in provided projection (if any). */ - private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { + private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, + @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { return cctx.discovery().cacheAffinityNode(n, cctx.name()) && - (prj == null || prj.node(n.id()) != null); + (prj == null || prj.node(n.id()) != null) && + (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion())); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16a8028..fac3d8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -52,6 +52,8 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; /** @@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final Object recipient = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { - @Override - public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) + @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } @@ -768,98 +769,139 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - private IgniteBiTuple<K, V> next; + final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = + new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IgniteBiTuple<K, V> next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter; - { - advance(); - } + private GridDhtLocalPartition locPart; - @Override public boolean onHasNext() { - return next != null; - } + { + Integer part = qry.partition(); - @Override public IgniteBiTuple<K, V> onNext() { - if (next == null) - throw new NoSuchElementException(); + if (part == null || dht == null) + iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + iter = F.emptyIterator(); + else { + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - IgniteBiTuple<K, V> next0 = next; + locPart = dht.topology().localPartition(part, topVer, false); - advance(); + if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) || + !locPart.reserve()) + throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - return next0; - } - private void advance() { - IgniteBiTuple<K, V> next0 = null; + iter = new Iterator<K>() { + private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator(); - while (iter.hasNext()) { - next0 = null; + @Override public boolean hasNext() { + return iter0.hasNext(); + } - K key = iter.next(); + @Override public K next() { + KeyCacheObject key = iter0.next(); - V val; + return key.value(cctx.cacheObjectContext(), false); + } - try { - val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + @Override public void remove() { + iter0.remove(); + } + }; } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - val = null; - } + advance(); + } - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); + @Override public boolean onHasNext() { + return next != null; + } - expiryPlc = cctx.cache().expiryPolicy(plc); - } + @Override public IgniteBiTuple<K, V> onNext() { + if (next == null) + throw new NoSuchElementException(); - if (val != null) { - next0 = F.t(key, val); + IgniteBiTuple<K, V> next0 = next; - if (checkPredicate(next0)) - break; - else - next0 = null; - } + advance(); + + return next0; } - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; + private void advance() { + IgniteBiTuple<K, V> next0 = null; - if (next == null) - sendTtlUpdate(); - } + while (iter.hasNext()) { + next0 = null; - @Override protected void onClose() { - sendTtlUpdate(); - } + K key = iter.next(); + + V val; + + try { + val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } + + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().expiryPolicy(plc); + } + + if (val != null) { + next0 = F.t(key, val); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } + } - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; - expiryPlc = null; + if (next == null) + sendTtlUpdate(); } - } - private boolean checkPredicate(Map.Entry<K, V> e) { - if (keyValFilter != null) { - Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + @Override protected void onClose() { + sendTtlUpdate(); - return keyValFilter.apply(e0.getKey(), e0.getValue()); + if (locPart != null) + locPart.release(); } - return true; - } - }; + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + private boolean checkPredicate(Map.Entry<K, V> e) { + if (keyValFilter != null) { + Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } + + return true; + } + }; final GridIterator<IgniteBiTuple<K, V>> it; @@ -914,7 +956,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups); + Integer part = qry.partition(); + + Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : + cctx.swap().rawSwapIterator(part); return scanIterator(it, filter, qry.keepPortable()); } @@ -930,10 +975,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); - return cctx.swap().rawOffHeapIterator(c, true, backups); + return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups); + Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); return scanIterator(it, filter, qry.keepPortable()); } @@ -1222,7 +1267,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteClosure<Map.Entry<K, V>, Object> trans = + (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); injectResources(trans); @@ -1529,11 +1576,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false, qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId()))); } - catch (Error e) { - fut.onDone(e); - - throw e; - } catch (Throwable e) { fut.onDone(e); @@ -1843,7 +1885,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE); + return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE); } }; } @@ -2920,6 +2962,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, null, null, + null, false, keepPortable); } @@ -2928,17 +2971,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Creates user's predicate based scan query. * * @param filter Scan filter. + * @param part Partition. * @param keepPortable Keep portable flag. * @return Created query. */ - @SuppressWarnings("unchecked") public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, - boolean keepPortable) { + @Nullable Integer part, boolean keepPortable) { + return new GridCacheQueryAdapter<>(cctx, SCAN, null, null, (IgniteBiPredicate<Object, Object>)filter, + part, false, keepPortable); } @@ -2962,6 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte clsName, search, null, + null, false, keepPortable); } @@ -2982,6 +3028,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, false, keepPortable); } @@ -3002,6 +3049,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, incMeta, keepPortable); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 845077f..7577954 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -26,6 +26,8 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + import java.io.*; import java.nio.*; import java.util.*; @@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** */ private int taskHash; + /** Partition. */ + private Integer part; + /** * Required by {@link Externalizable} */ @@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param clause Query clause. * @param clsName Query class name. * @param keyValFilter Key-value filter. + * @param part Partition. * @param rdc Reducer. * @param trans Transformer. * @param pageSize Page size. @@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache String clause, String clsName, IgniteBiPredicate<Object, Object> keyValFilter, + @Nullable Integer part, IgniteReducer<Object, Object> rdc, IgniteClosure<Object, Object> trans, int pageSize, @@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.clause = clause; this.clsName = clsName; this.keyValFilter = keyValFilter; + this.part = part; this.rdc = rdc; this.trans = trans; this.pageSize = pageSize; @@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache return taskHash; } + /** + * @return partition. + */ + @Nullable public Integer partition() { + return part; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); + case 21: + if (!writer.writeInt("part", part != null ? part : -1)) + return false; + + writer.incrementState(); } return true; @@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); + case 21: + int part0 = reader.readInt("part"); + + part = part0 == -1 ? null : part0; + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return true; @@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 21; + return 22; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f516968..c0e763f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), -1, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite private GridCloseableIterator<T> iterator0() { try { CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java index 9d41074..4601586 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java @@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() { @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java index 62bf3f7..cc8217d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java @@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); final int i0 = j; final int j0 = i; @@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa Object v1 = e.getValue(); Object v2 = ((IgniteKernal)grid).getCache("one").get(key); - assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + - ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); + assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" + + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); assertEquals(v1, v2); } catch (IgniteCheckedException e1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java index 068a46c..6ccfbc2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java @@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA * @throws Exception If failed. */ public void testQuery() throws Exception { - checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false); - checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false); + + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true); + + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true); } /** * @param cache Cache. + * @param scanPartitions Scan partitions. * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void checkQuery(GridCacheAdapter cache) throws Exception { + private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception { final int ENTRY_CNT = 500; - for (int i = 0; i < ENTRY_CNT; i++) - cache.getAndPut(new Key(i), new Person("p-" + i, i)); + Map<Integer, Map<Key, Person>> entries = new HashMap<>(); + + for (int i = 0; i < ENTRY_CNT; i++) { + Key key = new Key(i); + Person val = new Person("p-" + i, i); + + int part = cache.context().affinity().partition(key); + + cache.getAndPut(key, val); + + Map<Key, Person> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(key, val); + } try { - CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery( - new IgniteBiPredicate<Key, Person>() { - @Override public boolean apply(Key key, Person p) { - assertEquals(key.id, (Integer)p.salary); + int partitions = scanPartitions ? cache.context().affinity().partitions() : 1; - return key.id % 2 == 0; - } - }, false); + for (int i = 0; i < partitions; i++) { + CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery( + new IgniteBiPredicate<Key, Person>() { + @Override public boolean apply(Key key, Person p) { + assertEquals(key.id, (Integer)p.salary); - Collection<Map.Entry<Key, Person>> res = qry.execute().get(); + return key.id % 2 == 0; + } + }, (scanPartitions ? i : null), false); - assertEquals(ENTRY_CNT / 2, res.size()); + Collection<Map.Entry<Key, Person>> res = qry.execute().get(); - for (Map.Entry<Key, Person> e : res) { - Key k = e.getKey(); - Person p = e.getValue(); + if (!scanPartitions) + assertEquals(ENTRY_CNT / 2, res.size()); - assertEquals(k.id, (Integer)p.salary); - assertEquals(0, k.id % 2); - } + for (Map.Entry<Key, Person> e : res) { + Key k = e.getKey(); + Person p = e.getValue(); - qry = cache.context().queries().createScanQuery(null, false); + assertEquals(k.id, (Integer)p.salary); + assertEquals(0, k.id % 2); - res = qry.execute().get(); + if (scanPartitions) { + Map<Key, Person> partEntries = entries.get(i); - assertEquals(ENTRY_CNT, res.size()); + assertEquals(p, partEntries.get(k)); + } + } + + qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false); + + res = qry.execute().get(); + + if (!scanPartitions) + assertEquals(ENTRY_CNT, res.size()); + } testMultithreaded(cache, ENTRY_CNT / 2); } @@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key.id % 2 == 0; } - }, false); + }, null, false); for (int i = 0; i < 250; i++) { Collection<Map.Entry<Key, Person>> res = qry.execute().get(); @@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return val % 2 == 0; } - }, false); + }, null, false); Collection<Map.Entry<String, Long>> res = qry.execute().get(); @@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, val % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key % 2 == 0; } - }, false); + }, null, false); Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get(); @@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, key % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA this.name = name; this.salary = salary; } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + if (salary != person.salary) + return false; + + return !(name != null ? !name.equals(person.name) : person.name != null); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = name != null ? name.hashCode() : 0; + + return 31 * result + salary; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 1a60bbd..eb5027c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -61,6 +61,9 @@ import static org.junit.Assert.*; * Various tests for cache queries. */ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest { + /** Key count. */ + private static final int KEY_CNT = 5000; + /** Cache store. */ private static TestStore store = new TestStore(); @@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } /** + * @throws Exception In case of error. + */ + public void testScanPartitionQuery() throws Exception { + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + GridCacheContext cctx = ((IgniteCacheProxy)cache).context(); + + Map<Integer, Map<Integer, Integer>> entries = new HashMap<>(); + + for (int i = 0; i < KEY_CNT; i++) { + cache.put(i, i); + + int part = cctx.affinity().partition(i); + + Map<Integer, Integer> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(i, i); + } + + for (int i = 0; i < cctx.affinity().partitions(); i++) { + CacheQuery<Map.Entry<Integer, Integer>> qry = + ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false); + + CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Map<Integer, Integer> exp = entries.get(i); + + Collection<Map.Entry<Integer, Integer>> actual = fut.get(); + + if (exp == null) + assertTrue(actual.isEmpty()); + else + for (Map.Entry<Integer, Integer> entry : actual) + assertTrue(entry.getValue().equals(exp.get(entry.getKey()))); + } + } + + /** * JUnit. * * @throws Exception In case of error. @@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac for (int i = 0; i < 20; i++) cache.put(i, i); - QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() { + IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() { @Override public boolean apply(Integer k, Integer v) { return k >= 10; } - })); + }; + + QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter)); q.getAll(); @@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @return {@code true} if index has a table for given class. * @throws IgniteCheckedException If failed. */ - private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException { + private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) + throws IgniteCheckedException { return qryMgr.size(cls) != -1; }