# ignite-57
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1959a30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1959a30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1959a30 Branch: refs/heads/ignite-sql-tests Commit: b1959a30ec3beccb925e65cc7d0fb46f42f7b66a Parents: 7a997ad Author: sboikov <sboi...@gridgain.com> Authored: Fri Feb 6 14:44:57 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Feb 6 16:27:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheEntryImpl0.java | 60 ++++ .../cache/CacheWeakQueryIteratorsHolder.java | 4 +- .../processors/cache/GridCacheAdapter.java | 142 +++++++- .../processors/cache/GridCacheSwapManager.java | 160 +++++++++ .../distributed/dht/GridDhtCacheAdapter.java | 80 ++++- .../distributed/near/GridNearCacheAdapter.java | 8 + .../ignite/internal/util/lang/GridFunc.java | 149 ++++++++ .../cache/IgniteCachePeekModesAbstractTest.java | 359 +++++++++++++++++-- 8 files changed, 933 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java new file mode 100644 index 0000000..05c30c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import javax.cache.*; +import java.util.*; + +/** + * + */ +public class CacheEntryImpl0<K, V> implements Cache.Entry<K, V> { + /** */ + private final Map.Entry<K, V> e; + + /** + * @param e Entry. + */ + public CacheEntryImpl0(Map.Entry<K, V> e) { + this.e = e; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return e.getKey(); + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return e.getValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (!cls.equals(getClass())) + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); + + return (T)this; + } + + /** {@inheritDoc} */ + public String toString() { + return "CacheEntry [key=" + getKey() + ", val=" + getValue() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java index 902cf12..4e19a0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -123,7 +123,8 @@ public class CacheWeakQueryIteratorsHolder<V> { /** Weak reference. */ private final WeakReference<WeakQueryFutureIterator<T>> weakRef; - CacheIteratorConverter<T, V> convert; + /** */ + private final CacheIteratorConverter<T, V> convert; /** Init flag. */ private boolean init; @@ -136,6 +137,7 @@ public class CacheWeakQueryIteratorsHolder<V> { /** * @param fut GridCacheQueryFuture to iterate. + * @param convert Converter. */ WeakQueryFutureIterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { this.fut = fut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index be74645..68b846b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -663,7 +663,50 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, PeekModes modes = parsePeekModes(peekModes); - return null; + List<Iterator<Cache.Entry<K, V>>> its = new ArrayList<>(); + + if (ctx.isLocal()) { + modes.primary = true; + modes.backup = true; + + if (modes.heap) + its.add(iterator(map.entries0().iterator(), !ctx.keepPortable())); + } + else if (modes.heap) { + if (modes.near && ctx.isNear()) + its.add(ctx.near().nearEntriesIterator()); + + if (modes.primary || modes.backup) { + GridDhtCacheAdapter<K, V> cache = ctx.isNear() ? ctx.near().dht() : ctx.dht(); + + its.add(cache.localEntriesIterator(modes.primary, modes.backup)); + } + } + + // Swap and offheap are disabled for near cache. + if (modes.primary || modes.backup) { + long topVer = ctx.affinity().affinityTopologyVersion(); + + GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); + + if (modes.swap) + its.add(swapMgr.swapIterator(modes.primary, modes.backup, topVer)); + + if (modes.offheap) + its.add(swapMgr.offheapIterator(modes.primary, modes.backup, topVer)); + } + + final Iterator<Cache.Entry<K, V>> it = F.flatIterators(its); + + return new Iterable<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return it; + } + + public String toString() { + return "CacheLocalEntries []"; + } + }; } /** {@inheritDoc} */ @@ -5274,15 +5317,112 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** + * @param it Internal entry iterator. + * @param deserializePortable Deserialize portable flag. + * @return Public API iterator. + */ + protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<GridCacheEntryEx<K, V>> it, + final boolean deserializePortable) { + return new Iterator<Cache.Entry<K, V>>() { + { + advance(); + } + + /** */ + private Cache.Entry<K, V> next; + + @Override public boolean hasNext() { + return next != null; + } + + @Override public Cache.Entry<K, V> next() { + if (next == null) + throw new NoSuchElementException(); + + Cache.Entry<K, V> e = next; + + advance(); + + return e; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Switch to next entry. + */ + private void advance() { + next = null; + + while (it.hasNext()) { + GridCacheEntryEx<K, V> entry = it.next(); + + try { + V val = entry.innerGet( + null, + false, + false, + false, + true, + false, + false, + false, + null, + null, + null, + null, + null); + + if (val == null) + continue; + + K key = entry.key(); + + if (deserializePortable && ctx.portableEnabled()) { + key = (K)ctx.unwrapPortableIfNeeded(key, true); + val = (V)ctx.unwrapPortableIfNeeded(val, true); + } + + next = new CacheEntryImpl<>(key, val); + + break; + } + catch (IgniteCheckedException e) { + throw U.convertToCacheException(e); + } + catch (GridCacheEntryRemovedException ignore) { + // No-op. + } + catch (GridCacheFilterFailedException ignore) { + assert false; + } + } + } + }; + } + + /** * */ private static class PeekModes { + /** */ boolean near; + + /** */ boolean primary; + + /** */ boolean backup; + /** */ boolean heap; + + /** */ boolean offheap; + + /** */ boolean swap; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 419fdf5..affd27c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -33,6 +33,7 @@ import org.apache.ignite.spi.swapspace.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import java.lang.ref.*; import java.nio.*; import java.util.*; @@ -1511,6 +1512,66 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { } /** + * @param primary If {@code true} includes primary entries. + * @param backup If {@code true} includes backup entries. + * @param topVer Topology version. + * @return Swap entries iterator. + * @throws IgniteCheckedException If failed. + */ + public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, long topVer) + throws IgniteCheckedException + { + assert primary || backup; + + if (!swapEnabled) + return F.emptyIterator(); + + if (primary && backup) + return cacheEntryIterator(lazySwapIterator()); + + Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : + cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); + + return new PartitionsIterator(parts) { + @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException + { + return swapMgr.rawIterator(spaceName, part); + } + }; + } + + /** + * @param primary If {@code true} includes primary entries. + * @param backup If {@code true} includes backup entries. + * @param topVer Topology version. + * @return Offheap entries iterator. + * @throws IgniteCheckedException If failed. + */ + public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, long topVer) + throws IgniteCheckedException + { + assert primary || backup; + + if (!offheapEnabled) + return F.emptyIterator(); + + if (primary && backup) + return cacheEntryIterator(lazyOffHeapIterator()); + + Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : + cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); + + return new PartitionsIterator(parts) { + @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException + { + return offheap.iterator(spaceName, part); + } + }; + } + + /** * @param ldr Undeployed class loader. * @return Undeploy count. */ @@ -1611,6 +1672,19 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { } /** + * @param it Map.Entry iterator. + * @return Cache.Entry iterator. + */ + private static <K, V> Iterator<Cache.Entry<K, V>> cacheEntryIterator(Iterator<Map.Entry<K, V>> it) { + return F.iterator(it, new C1<Map.Entry<K, V>, Cache.Entry<K, V>>() { + @Override public Cache.Entry<K, V> apply(Map.Entry<K, V> e) { + // Create Cache.Entry over Map.Entry to do not deserialize key/values if not needed. + return new CacheEntryImpl0<>(e); + } + }, true); + } + + /** * */ private class IteratorWrapper extends GridCloseableIteratorAdapter<Map.Entry<byte[], GridCacheSwapEntry<V>>> { @@ -1688,4 +1762,90 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K, V> { e.valueClassLoaderId()); } } + + /** + * + */ + private abstract class PartitionsIterator implements Iterator<Cache.Entry<K, V>> { + /** */ + private Iterator<Integer> partIt; + + /** */ + private Iterator<Cache.Entry<K, V>> curIt; + + /** */ + private Cache.Entry<K, V> next; + + /** + * @param parts Partitions + */ + public PartitionsIterator(Collection<Integer> parts) { + this.partIt = parts.iterator(); + + advance(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return next != null; + } + + /** {@inheritDoc} */ + @Override public Cache.Entry<K, V> next() { + if (next == null) + throw new NoSuchElementException(); + + Cache.Entry<K, V> e = next; + + advance(); + + return e; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Switches to next element. + */ + private void advance() { + next = null; + + do { + if (curIt == null) { + if (partIt.hasNext()) { + int part = partIt.next(); + + try { + curIt = cacheEntryIterator(lazyIterator(partitionIterator(part))); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + if (curIt != null) { + if (curIt.hasNext()) { + next = curIt.next(); + + break; + } + else + curIt = null; + } + } + while (partIt.hasNext()); + } + + /** + * @param part Partition. + * @return Iterator for given partition. + * @throws IgniteCheckedException If failed. + */ + abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index e673641..f4d2d0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -35,6 +35,7 @@ import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -720,7 +721,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap ctx.io().send(req.getKey(), req.getValue()); } catch (IgniteCheckedException e) { - log.error("Failed to send TTL update request.", e); + U.error(log, "Failed to send TTL update request.", e); } } } @@ -911,6 +912,83 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * @param primary If {@code true} includes primary entries. + * @param backup If {@code true} includes backup entries. + * @return Local entries iterator. + */ + public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, final boolean backup) { + assert primary || backup; + + if (primary && backup) + return iterator(map.entries0().iterator(), !ctx.keepPortable()); + else { + final long topVer = ctx.affinity().affinityTopologyVersion(); + + final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator(); + + Iterator<GridCacheEntryEx<K, V>> it = new Iterator<GridCacheEntryEx<K, V>>() { + private GridCacheEntryEx<K, V> next; + + private Iterator<GridDhtCacheEntry<K, V>> curIt; + + { + advance(); + } + + @Override public boolean hasNext() { + return next != null; + } + + @Override public GridCacheEntryEx<K, V> next() { + if (next == null) + throw new NoSuchElementException(); + + GridCacheEntryEx<K, V> e = next; + + advance(); + + return e; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + + private void advance() { + next = null; + + do { + if (curIt == null) { + while (partIt.hasNext()) { + GridDhtLocalPartition<K, V> part = partIt.next(); + + if (primary == part.primary(topVer)) { + curIt = part.entries().iterator(); + + break; + } + } + } + + if (curIt != null) { + if (curIt.hasNext()) { + next = curIt.next(); + + break; + } + else + curIt = null; + } + } + while (partIt.hasNext()); + } + }; + + return iterator(it, !ctx.keepPortable()); + } + } + + /** * Complex partition iterator for both partition and swap iteration. */ private static class PartitionEntryIterator<K, V> extends GridIteratorAdapter<CacheEntry<K, V>> { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index abb1694..74d0f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.expiry.*; import java.io.*; import java.util.*; @@ -780,6 +781,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } } + /** + * @return Near entries iterator. + */ + public Iterator<Cache.Entry<K, V>> nearEntriesIterator() { + return iterator(map.entries0().iterator(), !ctx.keepPortable()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearCacheAdapter.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 77df627..6f31976 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -3091,6 +3091,65 @@ public class GridFunc { } /** + * Flattens iterable-of-iterators and returns iterator over the + * elements of the inner collections. This method doesn't create any + * new collections or copies any elements. + * + * @param c Input iterable of iterators. + * @return Iterator over the elements of given iterators. + */ + public static <T> Iterator<T> flatIterators(@Nullable final Iterable<Iterator<T>> c) { + return isEmpty(c) ? GridFunc.<T>emptyIterator() : new GridIteratorAdapter<T>() { + /** */ + private Iterator<? extends Iterator<T>> a = c.iterator(); + + /** */ + private Iterator<T> b; + + /** */ + private boolean moved = true; + + /** */ + private boolean more; + + @Override public boolean hasNextX() { + if (!moved) + return more; + + moved = false; + + if (b != null && b.hasNext()) + return more = true; + + while (a.hasNext()) { + b = a.next(); + + if (b.hasNext()) + return more = true; + } + + return more = false; + } + + @Override public T nextX() { + if (hasNext()) { + moved = true; + + return b.next(); + } + + throw new NoSuchElementException(); + } + + @Override public void removeX() { + assert b != null; + + b.remove(); + } + }; + } + + /** * Flattens given set objects into a single collection. Unrolls {@link Collection}, * {@link Iterable} and {@code Object[]} objects. * @@ -4313,6 +4372,96 @@ public class GridFunc { } /** + * @param c Input iterator. + * @param trans Transforming closure to convert from T1 to T2. + * @param readOnly If {@code true}, then resulting iterator will not allow modifications + * to the underlying collection. + * @param p Optional filtering predicates. + * @return Iterator from given iterator and optional filtering predicate. + */ + public static <T1, T2> Iterator<T2> iterator(final Iterator<? extends T1> c, + final IgniteClosure<? super T1, T2> trans, + final boolean readOnly, + @Nullable final IgnitePredicate<? super T1>... p) + { + A.notNull(c, "c", trans, "trans"); + + if (isAlwaysFalse(p)) + return F.emptyIterator(); + + return new GridIteratorAdapter<T2>() { + /** */ + private T1 elem; + + /** */ + private boolean moved = true; + + /** */ + private boolean more; + + /** */ + private Iterator<? extends T1> iter = c; + + @Override public boolean hasNextX() { + if (isEmpty(p)) + return iter.hasNext(); + else { + if (!moved) + return more; + else { + more = false; + + while (iter.hasNext()) { + elem = iter.next(); + + boolean isAll = true; + + for (IgnitePredicate<? super T1> r : p) + if (r != null && !r.apply(elem)) { + isAll = false; + + break; + } + + if (isAll) { + more = true; + moved = false; + + return true; + } + } + + elem = null; // Give to GC. + + return false; + } + } + } + + @Nullable @Override public T2 nextX() { + if (isEmpty(p)) + return trans.apply(iter.next()); + else { + if (hasNext()) { + moved = true; + + return trans.apply(elem); + } + else + throw new NoSuchElementException(); + } + } + + @Override public void removeX() { + if (readOnly) + throw new UnsupportedOperationException("Cannot modify read-only iterator."); + + iter.remove(); + } + }; + } + + /** * Gets predicate that always returns {@code true}. This method returns * constant predicate. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java index 653b064..b1fd167 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.swapspace.file.*; +import javax.cache.*; import java.util.*; import static org.apache.ignite.cache.CacheDistributionMode.*; @@ -45,7 +46,10 @@ import static org.apache.ignite.cache.CachePeekMode.*; */ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstractTest { /** */ - private static final int HEAP_ENTRIES = 10; + private static final String SPACE_NAME = "gg-swap-cache-dflt"; + + /** */ + private static final int HEAP_ENTRIES = 30; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -227,9 +231,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra Set<Integer> swapKeys = new HashSet<>(); - final String spaceName = "gg-swap-cache-dflt"; - - IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(spaceName, null); + IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(SPACE_NAME, null); assertNotNull(it); @@ -338,6 +340,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra cache0.removeAll(new HashSet<>(keys)); } } + /** * @throws Exception If failed. */ @@ -410,9 +413,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra try { int totalKeys = 200; - T2<Integer, Integer> swapKeys = swapKeys(0); + T2<Integer, Integer> swapKeys = swapKeysCount(0); - T2<Integer, Integer> offheapKeys = offheapKeys(0); + T2<Integer, Integer> offheapKeys = offheapKeysCount(0); int totalSwap = swapKeys.get1() + swapKeys.get2(); int totalOffheap = offheapKeys.get1() + offheapKeys.get2(); @@ -445,9 +448,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra } } else { - //checkSizeAffinityFilter(0); + checkSizeAffinityFilter(0); - //checkSizeAffinityFilter(1); + checkSizeAffinityFilter(1); checkSizeStorageFilter(0); @@ -608,14 +611,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra /** * @param nodeIdx Node index. - * @return Tuple with number of primary and backup keys. + * @return Tuple with primary and backup keys. */ - private T2<Integer, Integer> swapKeys(int nodeIdx) { + private T2<List<Integer>, List<Integer>> swapKeys(int nodeIdx) { FileSwapSpaceSpi swap = (FileSwapSpaceSpi)ignite(nodeIdx).configuration().getSwapSpaceSpi(); - final String spaceName = "gg-swap-cache-dflt"; - - IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(spaceName, null); + IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(SPACE_NAME, null); assertNotNull(it); @@ -623,18 +624,18 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra ClusterNode node = ignite(nodeIdx).cluster().localNode(); - int primary = 0; - int backups = 0; + List<Integer> primary = new ArrayList<>(); + List<Integer> backups = new ArrayList<>(); while (it.hasNext()) { Integer key = it.next(); if (aff.isPrimary(node, key)) - primary++; + primary.add(key); else { assertTrue(aff.isBackup(node, key)); - backups++; + backups.add(key); } } @@ -645,7 +646,17 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra * @param nodeIdx Node index. * @return Tuple with number of primary and backup keys. */ - private T2<Integer, Integer> offheapKeys(int nodeIdx) { + private T2<Integer, Integer> swapKeysCount(int nodeIdx) { + T2<List<Integer>, List<Integer>> keys = swapKeys(nodeIdx); + + return new T2<>(keys.get1().size(), keys.get2().size()); + } + + /** + * @param nodeIdx Node index. + * @return Tuple with primary and backup keys. + */ + private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) { GridCacheAdapter<Integer, String> internalCache = ((IgniteKernal)ignite(nodeIdx)).context().cache().internalCache(); @@ -660,18 +671,18 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra ClusterNode node = ignite(nodeIdx).cluster().localNode(); - int primary = 0; - int backups = 0; + List<Integer> primary = new ArrayList<>(); + List<Integer> backups = new ArrayList<>(); while (offheapIt.hasNext()) { Map.Entry<Integer, String> e = offheapIt.next(); if (aff.isPrimary(node, e.getKey())) - primary++; + primary.add(e.getKey()); else { assertTrue(aff.isBackup(node, e.getKey())); - backups++; + backups.add(e.getKey()); } } @@ -680,6 +691,16 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra /** * @param nodeIdx Node index. + * @return Tuple with number of primary and backup keys. + */ + private T2<Integer, Integer> offheapKeysCount(int nodeIdx) { + T2<List<Integer>, List<Integer>> keys = offheapKeys(nodeIdx); + + return new T2<>(keys.get1().size(), keys.get2().size()); + } + + /** + * @param nodeIdx Node index. * @throws Exception If failed. */ private void checkSizeStorageFilter(int nodeIdx) throws Exception { @@ -698,12 +719,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra int totalKeys = 200; - T2<Integer, Integer> swapKeys = swapKeys(nodeIdx); + T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx); assertTrue(swapKeys.get1() > 0); assertTrue(swapKeys.get2() > 0); - T2<Integer, Integer> offheapKeys = offheapKeys(nodeIdx); + T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx); assertTrue(offheapKeys.get1() > 0); assertTrue(offheapKeys.get2() > 0); @@ -740,12 +761,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra int globalOffheapBackup = 0; for (int i = 0; i < gridCount(); i++) { - T2<Integer, Integer> swap = swapKeys(i); + T2<Integer, Integer> swap = swapKeysCount(i); globalSwapPrimary += swap.get1(); globalSwapBackup += swap.get2(); - T2<Integer, Integer> offheap = offheapKeys(i); + T2<Integer, Integer> offheap = offheapKeysCount(i); globalOffheapPrimary += offheap.get1(); globalOffheapBackup += offheap.get2(); @@ -821,4 +842,290 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra assertEquals(exp, size); } + + /** + * @throws Exception If failed. + */ + public void testLocalEntries() throws Exception { + if (cacheMode() == LOCAL) { + IgniteCache<Integer, String> cache0 = jcache(0); + + Set<Integer> keys = new HashSet<>(); + + try { + for (int i = 0; i < HEAP_ENTRIES; i++) { + cache0.put(i, String.valueOf(i)); + + keys.add(i); + } + + checkLocalEntries(cache0.localEntries(), keys); + checkLocalEntries(cache0.localEntries(ALL), keys); + checkLocalEntries(cache0.localEntries(NEAR), keys); + checkLocalEntries(cache0.localEntries(PRIMARY), keys); + checkLocalEntries(cache0.localEntries(BACKUP), keys); + } + finally { + cache0.removeAll(keys); + } + + checkLocalEntries(cache0.localEntries()); + + final String val = "test-val-"; + + keys = new HashSet<>(); + + for (int i = 0; i < 200; i++) { + cache0.put(i, val + i); + + keys.add(i); + } + + try { + int totalKeys = 200; + + T2<List<Integer>, List<Integer>> swapKeys = swapKeys(0); + + T2<List<Integer>, List<Integer>> offheapKeys = offheapKeys(0); + + List<Integer> swap = new ArrayList<>(); + + swap.addAll(swapKeys.get1()); + swap.addAll(swapKeys.get2()); + + assertFalse(swap.isEmpty()); + + List<Integer> offheap = new ArrayList<>(); + + offheap.addAll(offheapKeys.get1()); + offheap.addAll(offheapKeys.get2()); + + assertFalse(offheap.isEmpty()); + + log.info("Keys [total=" + totalKeys + + ", offheap=" + offheap.size() + + ", swap=" + swap.size() + ']'); + + assertTrue(swap.size() + offheap.size() < totalKeys); + + List<Integer> heap = new ArrayList<>(keys); + + heap.removeAll(swap); + heap.removeAll(offheap); + + assertFalse(heap.isEmpty()); + + checkLocalEntries(cache0.localEntries(), val, keys); + checkLocalEntries(cache0.localEntries(ALL), val, keys); + + checkLocalEntries(cache0.localEntries(OFFHEAP), val, offheap); + checkLocalEntries(cache0.localEntries(SWAP), val, swap); + checkLocalEntries(cache0.localEntries(ONHEAP), val, heap); + + checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, offheap); + checkLocalEntries(cache0.localEntries(SWAP, PRIMARY), val, swap); + checkLocalEntries(cache0.localEntries(ONHEAP, PRIMARY), val, heap); + + checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, offheap); + checkLocalEntries(cache0.localEntries(SWAP, BACKUP), val, swap); + checkLocalEntries(cache0.localEntries(ONHEAP, BACKUP), val, heap); + + checkLocalEntries(cache0.localEntries(OFFHEAP, NEAR), val, offheap); + checkLocalEntries(cache0.localEntries(SWAP, NEAR), val, swap); + checkLocalEntries(cache0.localEntries(ONHEAP, NEAR), val, heap); + } + finally { + cache0.removeAll(keys); + } + } + else { + //checkLocalEntriesAffinityFilter(0); + + //checkLocalEntriesAffinityFilter(1); + + checkLocalEntriesStorageFilter(0); + + checkLocalEntriesStorageFilter(1); + } + } + + /** + * @param nodeIdx Node index. + * @throws Exception If failed. + */ + private void checkLocalEntriesStorageFilter(int nodeIdx) throws Exception { + IgniteCache<Integer, String> cache0 = jcache(nodeIdx); + + List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000); + List<Integer> backupKeys = backupKeys(cache0, 100, 10_000); + + try { + final String val = "test_value-"; + + for (int i = 0; i < 100; i++) { + cache0.put(primaryKeys.get(i), val + primaryKeys.get(i)); + cache0.put(backupKeys.get(i), val + backupKeys.get(i)); + } + + int totalKeys = 200; + + T2<List<Integer>, List<Integer>> swapKeys = swapKeys(nodeIdx); + + assertTrue(swapKeys.get1().size() > 0); + assertTrue(swapKeys.get2().size() > 0); + + T2<List<Integer>, List<Integer>> offheapKeys = offheapKeys(nodeIdx); + + assertTrue(offheapKeys.get1().size() > 0); + assertTrue(offheapKeys.get2().size() > 0); + + List<Integer> swap = new ArrayList<>(); + + swap.addAll(swapKeys.get1()); + swap.addAll(swapKeys.get2()); + + assertFalse(swap.isEmpty()); + + List<Integer> offheap = new ArrayList<>(); + + offheap.addAll(offheapKeys.get1()); + offheap.addAll(offheapKeys.get2()); + + assertFalse(offheap.isEmpty()); + + List<Integer> heap = new ArrayList<>(); + + heap.addAll(primaryKeys); + heap.addAll(backupKeys); + + heap.removeAll(swap); + heap.removeAll(offheap); + + log.info("Keys [total=" + totalKeys + + ", offheap=" + offheap.size() + + ", swap=" + swap.size() + ']'); + + assertFalse(heap.isEmpty()); + + checkLocalEntries(cache0.localEntries(), val, primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(ALL), val, primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(ONHEAP, OFFHEAP, SWAP), val, primaryKeys, backupKeys); + + checkLocalEntries(cache0.localEntries(SWAP), val, swap); + checkLocalEntries(cache0.localEntries(OFFHEAP), val, offheap); + checkLocalEntries(cache0.localEntries(ONHEAP), val, heap); + + checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP), val, swap, offheap); + checkLocalEntries(cache0.localEntries(SWAP, ONHEAP), val, swap, heap); + + checkLocalEntries(cache0.localEntries(SWAP, PRIMARY), val, swapKeys.get1()); + checkLocalEntries(cache0.localEntries(SWAP, BACKUP), val, swapKeys.get2()); + checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, offheapKeys.get1()); + checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, offheapKeys.get2()); + + checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, PRIMARY), val, swapKeys.get1(), offheapKeys.get1()); + checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, BACKUP), val, swapKeys.get2(), offheapKeys.get2()); + checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, PRIMARY, BACKUP), val, swap, offheap); + } + finally { + cache0.removeAll(new HashSet<>(primaryKeys)); + cache0.removeAll(new HashSet<>(backupKeys)); + } + } + + /** + * @param nodeIdx Node index. + * @throws Exception If failed. + */ + private void checkLocalEntriesAffinityFilter(int nodeIdx) throws Exception { + IgniteCache<Integer, String> cache0 = jcache(nodeIdx); + + final int PUT_KEYS = 10; + + List<Integer> primaryKeys = null; + List<Integer> backupKeys = null; + List<Integer> nearKeys = null; + + try { + primaryKeys = primaryKeys(cache0, PUT_KEYS, 0); + backupKeys = backupKeys(cache0, PUT_KEYS, 0); + + for (Integer key : primaryKeys) + cache0.put(key, String.valueOf(key)); + for (Integer key : backupKeys) + cache0.put(key, String.valueOf(key)); + + nearKeys = cacheMode() == PARTITIONED ? nearKeys(cache0, PUT_KEYS, 0) : Collections.<Integer>emptyList(); + + for (Integer key : nearKeys) + cache0.put(key, String.valueOf(key)); + + log.info("Keys [near=" + nearKeys + ", primary=" + primaryKeys + ", backup=" + backupKeys + ']'); + + boolean hasNearCache = nodeIdx == 0 && cacheMode() == PARTITIONED; + + if (hasNearCache) { + checkLocalEntries(cache0.localEntries(), nearKeys, primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(ALL), nearKeys, primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(NEAR), nearKeys); + checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), nearKeys, primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), nearKeys, primaryKeys); + checkLocalEntries(cache0.localEntries(NEAR, BACKUP), nearKeys, backupKeys); + } + else { + checkLocalEntries(cache0.localEntries(), primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(ALL), primaryKeys, backupKeys); + checkLocalEntries(cache0.localEntries(NEAR)); + checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), primaryKeys); + checkLocalEntries(cache0.localEntries(NEAR, BACKUP), backupKeys); + checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), primaryKeys, backupKeys); + } + + checkLocalEntries(cache0.localEntries(PRIMARY), primaryKeys); + checkLocalEntries(cache0.localEntries(BACKUP), backupKeys); + checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP), primaryKeys, backupKeys); + } + finally { + if (primaryKeys != null) + cache0.removeAll(new HashSet<>(primaryKeys)); + + if (backupKeys != null) + cache0.removeAll(new HashSet<>(backupKeys)); + + if (nearKeys != null) + cache0.removeAll(new HashSet<>(nearKeys)); + } + } + + /** + * @param entries Entries. + * @param exp Expected entries. + */ + private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries, Collection<Integer>... exp) { + checkLocalEntries(entries, "", exp); + } + + /** + * @param entries Entries. + * @param expVal Expected value. + * @param exp Expected keys. + */ + private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries, + String expVal, + Collection<Integer>... exp) { + Set<Integer> allExp = new HashSet<>(); + + for (Collection<Integer> col : exp) + assertTrue(allExp.addAll(col)); + + for (Cache.Entry<Integer, String> e : entries) { + assertNotNull(e.getKey()); + assertNotNull(e.getValue()); + assertEquals(expVal + e.getKey(), e.getValue()); + + assertTrue("Unexpected entry: " + e, allExp.remove(e.getKey())); + } + + assertTrue("Expected entries not found: " + allExp, allExp.isEmpty()); + } }