http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java new file mode 100644 index 0000000..611cb21 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Affinity API tests. + */ +public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final Random RND = new Random(); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setBackups(1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** + * @return Affinity. + */ + private GridCacheAffinityFunction affinity() { + return ((GridKernal)grid(0)).internalCache().configuration().getAffinity(); + } + + /** + * @return Affinity mapper. + */ + private GridCacheAffinityKeyMapper affinityMapper() { + return ((GridKernal)grid(0)).internalCache().configuration().getAffinityMapper(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPartitions() throws Exception { + assertEquals(affinity().partitions(), cache().affinity().partitions()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPartition() throws Exception { + String key = "key"; + + assertEquals(affinity().partition(key), cache().affinity().partition(key)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPrimaryPartitionsOneNode() throws Exception { + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (ClusterNode node : grid(0).nodes()) { + int[] parts = cache().affinity().primaryPartitions(node); + + assert !F.isEmpty(parts); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + ClusterNode primary = F.first(owners); + + assert F.eqNodes(node, primary); + } + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPrimaryPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + int[] parts = cache().affinity().primaryPartitions(n0); + + info("Primary partitions count: " + parts.length); + + assert parts.length > 1 : "Invalid partitions: " + Arrays.toString(parts); + + for (int part : parts) + assert part >= 0; + + assert !F.isEmpty(parts); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + ClusterNode primary = F.first(owners); + + assert F.eqNodes(n0, primary); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testBackupPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + // Get backup partitions without explicitly specified levels. + int[] parts = cache().affinity().backupPartitions(n0); + + assert !F.isEmpty(parts); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = new ArrayList<>(nodes(assignment, p)); + + assert !F.isEmpty(owners); + + // Remove primary. + Iterator<ClusterNode> iter = owners.iterator(); + + iter.next(); + iter.remove(); + + assert owners.contains(n0); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAllPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + int[] parts = cache().affinity().allPartitions(n0); + + assert !F.isEmpty(parts); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + assert owners.contains(n0); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionToNode() throws Exception { + int part = RND.nextInt(affinity().partitions()); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + GridCacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + assertEquals(F.first(nodes(assignment, aff, part)), cache().affinity().mapPartitionToNode(part)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNode() throws Exception { + Map<Integer, ClusterNode> map = cache().affinity().mapPartitionsToNodes(F.asList(0, 1, 5, 19, 12)); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + GridCacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNodeArray() throws Exception { + Map<Integer, ClusterNode> map = cache().affinity().mapPartitionsToNodes(F.asList(0, 1, 5, 19, 12)); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + GridCacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNodeCollection() throws Exception { + Collection<Integer> parts = new LinkedList<>(); + + for (int p = 0; p < affinity().partitions(); p++) + parts.add(p); + + Map<Integer, ClusterNode> map = cache().affinity().mapPartitionsToNodes(parts); + + GridCacheAffinityFunctionContext ctx = + new GridCacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + GridCacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * Gets affinity nodes for partition. + * + * @param assignments Assignments. + * @param part Partition to get affinity nodes for. + * @return Affinity nodes. + */ + private List<ClusterNode> nodes(List<List<ClusterNode>> assignments, int part) { + return assignments.get(part); + } + + /** + * Gets affinity nodes for partition. + * + * @param key Affinity key. + * @return Affinity nodes. + */ + private Iterable<ClusterNode> nodes(List<List<ClusterNode>> assignment, GridCacheAffinityFunction aff, Object key) { + return assignment.get(aff.partition(key)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testEntryPartition() throws Exception { + int keyCnt = 100; + + for (int kv = 0; kv < keyCnt; kv++) + cache().put(String.valueOf(kv), kv); + + for (int kv = 0; kv < keyCnt; kv++) { + String key = String.valueOf(kv); + + GridCacheEntry<String, Integer> entry = cache().entry(key); + + assert entry != null; + + assertEquals(affinity().partition(key), entry.partition()); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionWithAffinityMapper() throws Exception { + GridCacheAffinityKey<Integer> key = new GridCacheAffinityKey<>(1, 2); + + int expPart = affinity().partition(affinityMapper().affinityKey(key)); + + for (int i = 0; i < gridCount(); i++) { + assertEquals(expPart, grid(i).cache(null).affinity().partition(key)); + assertEquals(expPart, grid(i).cache(null).entry(key).partition()); + } + + assertTrue(grid(0).cache(null).putx(key, 1)); + + for (int i = 0; i < gridCount(); i++) { + assertEquals(expPart, grid(i).cache(null).affinity().partition(key)); + assertEquals(expPart, grid(i).cache(null).entry(key).partition()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityMapperSelfTest.java new file mode 100644 index 0000000..d356c6e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityMapperSelfTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Test affinity mapper. + */ +public class GridCacheAffinityMapperSelfTest extends GridCommonAbstractTest { + /** + * + */ + public void testMethodAffinityMapper() { + GridCacheAffinityKeyMapper mapper = + new GridCacheDefaultAffinityKeyMapper(); + + List<GridCacheAffinityKey<Integer>> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(new GridCacheAffinityKey<>(i, Integer.toString(i))); + + for (int i = 1; i <= 10; i++) { + GridCacheAffinityKey<Integer> key = keys.get(i - 1); + + Object mapped = mapper.affinityKey(key); + + info("Mapped key: " + mapped); + + assertNotNull(mapped); + assertSame(key.affinityKey(), mapped); + } + } + + /** + * + */ + public void testFieldAffinityMapper() { + GridCacheAffinityKeyMapper mapper = + new GridCacheDefaultAffinityKeyMapper(); + + List<FieldAffinityKey<Integer>> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(new FieldAffinityKey<>(i, Integer.toString(i))); + + for (int i = 1; i <= 10; i++) { + FieldAffinityKey<Integer> key = keys.get(i - 1); + + Object mapped = mapper.affinityKey(key); + + info("Mapped key: " + mapped); + + assertNotNull(mapped); + assertSame(key.affinityKey(), mapped); + } + } + + /** + * + */ + public void testFieldAffinityMapperWithWrongClass() { + GridCacheAffinityKeyMapper mapper = + new GridCacheDefaultAffinityKeyMapper(); + + FieldNoAffinityKey key = new FieldNoAffinityKey(); + Object mapped = mapper.affinityKey(key); + assertEquals(key, mapped); + } + + /** + * Test key for field annotation. + */ + private static class FieldNoAffinityKey { + // No-op. + } + + /** + * Test key for field annotation. + */ + private static class FieldAffinityKey<K> { + /** Key. */ + private K key; + + /** Affinity key. */ + @GridCacheAffinityKeyMapped + private Object affKey; + + /** + * Initializes key together with its affinity key counter-part. + * + * @param key Key. + * @param affKey Affinity key. + */ + FieldAffinityKey(K key, Object affKey) { + this.key = key; + this.affKey = affKey; + } + + /** + * @return Key. + */ + public K key() { + return key; + } + + /** + * @return Affinity key. + */ + public Object affinityKey() { + return affKey; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java new file mode 100644 index 0000000..ab5f776 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java @@ -0,0 +1,689 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Affinity routing tests. + */ +public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final String NON_DFLT_CACHE_NAME = "myCache"; + + /** */ + private static final int KEY_CNT = 50; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Constructs test. + */ + public GridCacheAffinityRoutingSelfTest() { + super(/* don't start grid */ false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + if (!gridName.equals(getTestGridName(GRID_CNT))) { + // Default cache configuration. + CacheConfiguration dfltCacheCfg = defaultCacheConfiguration(); + + dfltCacheCfg.setCacheMode(PARTITIONED); + dfltCacheCfg.setBackups(1); + dfltCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + // Non-default cache configuration. + CacheConfiguration namedCacheCfg = defaultCacheConfiguration(); + + namedCacheCfg.setCacheMode(PARTITIONED); + namedCacheCfg.setBackups(1); + namedCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + namedCacheCfg.setName(NON_DFLT_CACHE_NAME); + + cfg.setCacheConfiguration(dfltCacheCfg, namedCacheCfg); + } + else { + // No cache should be configured for extra node. + cfg.setCacheConfiguration(); + } + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + assert G.allGrids().size() == GRID_CNT; + + for (int i = 0; i < KEY_CNT; i++) { + grid(0).cache(null).put(i, i); + + grid(0).cache(NON_DFLT_CACHE_NAME).put(i, i); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + for (int i = 0; i < GRID_CNT; i++) + stopGrid(i); + + assert G.allGrids().isEmpty(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { + for (int i = 0; i < KEY_CNT; i++) + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, i, new CheckRunnable(i, i)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityRunComplexKey() throws Exception { + for (int i = 0; i < KEY_CNT; i++) { + AffinityTestKey key = new AffinityTestKey(i); + + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, i, new CheckRunnable(i, key)); + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, key, new CheckRunnable(i, key)); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + for (int i = 0; i < KEY_CNT; i++) + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, i, new CheckCallable(i, i)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityCallComplexKey() throws Exception { + for (int i = 0; i < KEY_CNT; i++) { + final AffinityTestKey key = new AffinityTestKey(i); + + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, i, new CheckCallable(i, key)); + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, key, new CheckCallable(i, key)); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testField() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new FieldAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMethod() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new MethodAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testFiledCacheName() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new FieldCacheNameAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMethodCacheName() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new MethodCacheNameAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMultipleAnnotationsJob() throws Exception { + try { + grid(0).compute().call(new MultipleAnnotationsJob(0)); + + fail(); + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testTask() throws Exception { + // Jobs should be routed correctly. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().execute(new OneJobTask(i), i) : + "Job was routed to a wrong node [i=" + i + "]"; + + info("Starting extra node without configured caches..."); + + assertEquals(GRID_CNT, G.allGrids().size()); + + Ignite g = startGrid(GRID_CNT); + + try { + assertEquals(GRID_CNT + 1, g.cluster().nodes().size()); + + for (int i = 0; i < KEY_CNT; i++) + assert grid(GRID_CNT).compute().execute(new OneJobTask(i), i) : + "Job was routed to a wrong node [i=" + i + "]"; + } + finally { + stopGrid(GRID_CNT); + } + } + + /** + * Test job with field annotation. + */ + private static class FieldAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridCacheAffinityKeyMapped + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + FieldAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName() == null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + GridCacheAffinity<Object> aff = ignite.cache(null).affinity(); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FieldAffinityJob.class, this); + } + } + + /** + * Test job with method annotation. + */ + private static class MethodAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + MethodAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @GridCacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affinityKey()); + assert jobCtx.cacheName() == null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + GridCacheAffinity<Object> aff = ignite.cache(null).affinity(); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MethodAffinityJob.class, this); + } + } + + /** + * Test job with field cache name annotation. + */ + private static class FieldCacheNameAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** Cache name to use affinity from. */ + @GridCacheName + private String cacheName = NON_DFLT_CACHE_NAME; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + FieldCacheNameAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @GridCacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(cacheName); + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + GridCacheAffinity<Object> aff = ignite.cache(cacheName).affinity(); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FieldCacheNameAffinityJob.class, this); + } + } + + /** + * Test job with method cache name annotation. + */ + private static class MethodCacheNameAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + MethodCacheNameAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @GridCacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** + * @return Cache name for affinity routing. + */ + @GridCacheName + public String cacheName() { + return NON_DFLT_CACHE_NAME; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(cacheName()); + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + GridCacheAffinity<Object> aff = ignite.cache(cacheName()).affinity(); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MethodCacheNameAffinityJob.class, this); + } + } + + /** + * Test job with method cache name annotation. + */ + private static class MultipleAnnotationsJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + @GridCacheAffinityKeyMapped + private Object affKey; + + /** Duplicated affinity key. */ + @SuppressWarnings({"UnusedDeclaration"}) + @GridCacheAffinityKeyMapped + private Object affKeyDup; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param affKey Affinity key. + */ + MultipleAnnotationsJob(Object affKey) { + this.affKey = affKey; + affKeyDup = affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + GridCacheAffinity<Object> aff = ignite.cache(null).affinity(); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MultipleAnnotationsJob.class, this); + } + } + + /** + * Test task that produces a single job. + */ + private static class OneJobTask extends ComputeTaskSplitAdapter<Integer, Boolean> { + /** Affinity key. */ + @GridToStringInclude + @GridCacheAffinityKeyMapped + private Object affKey; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param affKey Affinity key. + */ + private OneJobTask(Integer affKey) { + this.affKey = affKey; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Integer arg) throws IgniteCheckedException { + return F.asList(new ComputeJobAdapter() { + @Override public Object execute() { + GridCacheAffinity<Object> aff = ignite.cache(null).affinity(); + + ClusterNode primary = aff.mapKeyToNode(affKey); + + if (log.isInfoEnabled()) + log.info("Primary node for the job key [affKey=" + affKey + ", primary=" + primary.id() + "]"); + + return F.eqNodes(ignite.cluster().localNode(), primary); + } + }); + } + + /** {@inheritDoc} */ + @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + } + + /** + * Test key. + */ + private static class AffinityTestKey { + /** Affinity key. */ + @GridCacheAffinityKeyMapped + private final int affKey; + + /** + * @param affKey Affinity key. + */ + private AffinityTestKey(int affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + public int affinityKey() { + return affKey; + } + } + + /** + * Test runnable. + */ + private static class CheckRunnable extends CAX { + /** Affinity key. */ + private final Object affKey; + + /** Key. */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + * @param key Key. + */ + private CheckRunnable(Object affKey, Object key) { + this.affKey = affKey; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public void applyx() throws IgniteCheckedException { + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id()); + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id()); + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(NON_DFLT_CACHE_NAME); + } + } + + /** + * Test callable. + */ + private static class CheckCallable implements IgniteCallable<Object> { + /** Affinity key. */ + private final Object affKey; + + /** Key. */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + * @param key Key. + */ + private CheckCallable(Object affKey, Object key) { + this.affKey = affKey; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws IgniteCheckedException { + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id()); + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id()); + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(NON_DFLT_CACHE_NAME); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAlwaysEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAlwaysEvictionPolicy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAlwaysEvictionPolicy.java new file mode 100644 index 0000000..8067926 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAlwaysEvictionPolicy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; + +/** + * Cache eviction policy that expires every entry essentially keeping the cache empty. + * This eviction policy can be used whenever one cache is used to front another + * and its size should be kept at {@code 0}. + */ +public class GridCacheAlwaysEvictionPolicy<K, V> implements GridCacheEvictionPolicy<K, V> { + /** {@inheritDoc} */ + @Override public void onEntryAccessed(boolean rmv, GridCacheEntry<K, V> entry) { + if (!rmv && entry.isCached()) + entry.evict(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAsyncOperationsLimitSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAsyncOperationsLimitSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAsyncOperationsLimitSelfTest.java new file mode 100644 index 0000000..645f245 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAsyncOperationsLimitSelfTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.concurrent.atomic.*; + +/** + * Checks that number of concurrent asynchronous operations is limited when configuration parameter is set. + */ +public class GridCacheAsyncOperationsLimitSelfTest extends GridCacheAbstractSelfTest { + /** */ + public static final int MAX_CONCURRENT_ASYNC_OPS = 50; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cCfg = super.cacheConfiguration(gridName); + + cCfg.setMaxConcurrentAsyncOperations(MAX_CONCURRENT_ASYNC_OPS); + + return cCfg; + } + + /** + * @throws Exception If failed. + */ + public void testAsyncOps() throws Exception { + final AtomicInteger cnt = new AtomicInteger(); + final GridAtomicInteger max = new GridAtomicInteger(); + + for (int i = 0; i < 5000; i++) { + final int i0 = i; + + cnt.incrementAndGet(); + + IgniteFuture<Boolean> fut = cache().putxAsync("key" + i, i); + + fut.listenAsync(new CI1<IgniteFuture<Boolean>>() { + @Override public void apply(IgniteFuture<Boolean> t) { + cnt.decrementAndGet(); + + max.setIfGreater(cnt.get()); + + if (i0 > 0 && i0 % 100 == 0) + info("cnt: " + cnt.get()); + } + }); + + assertTrue("Maximum number of permits exceeded: " + max.get(), max.get() <= 51); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java new file mode 100644 index 0000000..d4809f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests messages being sent between nodes in ATOMIC mode. + */ +public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest { + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Starting grid index. */ + private int idx; + + /** Partition distribution mode. */ + private GridCacheDistributionMode partDistMode; + + /** Write sync mode. */ + private GridCacheAtomicWriteOrderMode writeOrderMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration cCfg = new CacheConfiguration(); + + cCfg.setCacheMode(PARTITIONED); + cCfg.setBackups(1); + cCfg.setWriteSynchronizationMode(FULL_SYNC); + cCfg.setDistributionMode(partDistMode); + cCfg.setAtomicWriteOrderMode(writeOrderMode); + + if (idx == 0) + cCfg.setDistributionMode(partDistMode); + else + cCfg.setDistributionMode(PARTITIONED_ONLY); + + idx++; + + cfg.setCacheConfiguration(cCfg); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedClock() throws Exception { + checkMessages(PARTITIONED_ONLY, CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedPrimary() throws Exception { + checkMessages(PARTITIONED_ONLY, PRIMARY); + } + + /** + * @throws Exception If failed. + */ + public void testClientClock() throws Exception { + checkMessages(CLIENT_ONLY, CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testClientPrimary() throws Exception { + checkMessages(CLIENT_ONLY, PRIMARY); + } + + /** + * @param distMode Distribution mode. + * @param orderMode Write ordering mode. + * @throws Exception If failed. + */ + protected void checkMessages(GridCacheDistributionMode distMode, + GridCacheAtomicWriteOrderMode orderMode) throws Exception { + + partDistMode = distMode; + writeOrderMode = orderMode; + + startGrids(4); + + try { + awaitPartitionMapExchange(); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); + + commSpi.registerMessage(GridNearAtomicUpdateRequest.class); + commSpi.registerMessage(GridDhtAtomicUpdateRequest.class); + + int putCnt = 15; + + int expNearCnt = 0; + int expDhtCnt = 0; + + for (int i = 0; i < putCnt; i++) { + ClusterNode locNode = grid(0).localNode(); + + if (writeOrderMode == CLOCK) { + if (cache(0).affinity().isPrimary(locNode, i) || cache(0).affinity().isBackup(locNode, i)) + expNearCnt++; + else + expNearCnt += 2; + } + else { + if (cache(0).affinity().isPrimary(locNode, i)) + expDhtCnt++; + else + expNearCnt ++; + } + + cache(0).put(i, i); + } + + assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicUpdateRequest.class)); + assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicUpdateRequest.class)); + + if (writeOrderMode == CLOCK) { + for (int i = 1; i < 4; i++) { + commSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi(); + + assertEquals(0, commSpi.messageCount(GridNearAtomicUpdateRequest.class)); + assertEquals(0, commSpi.messageCount(GridDhtAtomicUpdateRequest.class)); + } + } + else { + for (int i = 1; i < 4; i++) { + commSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi(); + + assertEquals(0, commSpi.messageCount(GridNearAtomicUpdateRequest.class)); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test communication SPI. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Counters map. */ + private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass()); + + if (cntr != null) + cntr.incrementAndGet(); + + super.sendMessage(node, msg); + } + + /** + * Registers message for counting. + * + * @param cls Class to count. + */ + public void registerMessage(Class<?> cls) { + AtomicInteger cntr = cntMap.get(cls); + + if (cntr == null) + cntMap.put(cls, new AtomicInteger()); + } + + /** + * @param cls Message type to get count. + * @return Number of messages of given class. + */ + public int messageCount(Class<?> cls) { + AtomicInteger cntr = cntMap.get(cls); + + return cntr == null ? 0 : cntr.get(); + } + + /** + * Resets counter to zero. + */ + public void resetCount() { + cntMap.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicApiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicApiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicApiAbstractTest.java new file mode 100644 index 0000000..b892230 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicApiAbstractTest.java @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test cases for multi-threaded tests. + */ +@SuppressWarnings("LockAcquiredButNotSafelyReleased") +public abstract class GridCacheBasicApiAbstractTest extends GridCommonAbstractTest { + /** Grid. */ + private Ignite ignite; + + /** + * + */ + protected GridCacheBasicApiAbstractTest() { + super(true /*start grid.*/); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite = grid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite = null; + } + + /** + * + * @throws Exception If test failed. + */ + public void testBasicLock() throws Exception { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + Lock lock = cache.lock(1); + + assert lock.tryLock(); + + assert cache.isLocked(1); + + lock.unlock(); + + assert !cache.isLocked(1); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testSingleLockReentry() throws IgniteCheckedException { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + Lock lock = cache.lock(1); + + lock.lock(); + + try { + assert cache.isLockedByThread(1); + + lock.lock(); + + lock.unlock(); + + assert cache.isLockedByThread(1); + } + finally { + lock.unlock(); + } + + assert !cache.isLockedByThread(1); + assert !cache.isLocked(1); + } + + /** + * + * @throws Exception If test failed. + */ + public void testReentry() throws Exception { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + Lock lock = cache.lock(1); + + lock.lock(); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + lock.lock(); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + lock.lock(); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + lock.unlock(); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + lock.unlock(); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + lock.unlock(); + + assert !cache.isLocked(1); + assert !cache.isLockedByThread(1); + } + + /** + * + */ + public void testInterruptLock() throws InterruptedException { + final IgniteCache<Integer, String> cache = ignite.jcache(null); + + cache.lock(1).lock(); + + final AtomicBoolean isOk = new AtomicBoolean(false); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + assertFalse(cache.lock(1).isLockedByThread()); + + cache.lock(1).lock(); + + try { + assertTrue(cache.lock(1).isLockedByThread()); + } + finally { + cache.lock(1).unlock(); + } + + assertTrue(Thread.currentThread().isInterrupted()); + + isOk.set(true); + } + }); + + t.start(); + + Thread.sleep(100); + + t.interrupt(); + + cache.lock(1).unlock(); + + t.join(); + + assertTrue(isOk.get()); + } + + /** + * + */ + public void testInterruptLockWithTimeout() throws InterruptedException { + final IgniteCache<Integer, String> cache = ignite.jcache(null); + + cache.lock(2).lock(); + + final AtomicBoolean isOk = new AtomicBoolean(false); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + cache.lockAll(Arrays.asList(1, 2)).tryLock(5000, MILLISECONDS); + } + catch (InterruptedException ignored) { + isOk.set(true); + } + } + }); + + t.start(); + + Thread.sleep(100); + + t.interrupt(); + + t.join(); + + cache.lock(2).unlock(); + + assertFalse(cache.lock(1).isLocked()); + assertFalse(cache.lock(2).isLocked()); + + assertTrue(isOk.get()); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testManyLockReentries() throws IgniteCheckedException { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + Integer key = 1; + + cache.lock(key).lock(); + + try { + assert cache.get(key) == null; + assert cache.getAndPut(key, "1") == null; + assert "1".equals(cache.get(key)); + + assert cache.isLocked(key); + assert cache.isLockedByThread(key); + + cache.lock(key).lock(); + + assert cache.isLocked(key); + assert cache.isLockedByThread(key); + + try { + assert "1".equals(cache.getAndRemove(key)); + } + finally { + cache.lock(key).unlock(); + } + + assert cache.isLocked(key); + assert cache.isLockedByThread(key); + } + finally { + cache.lock(key).unlock(); + + assert !cache.isLocked(key); + assert !cache.isLockedByThread(key); + } + } + + /** + * @throws Exception If test failed. + */ + public void testLockMultithreaded() throws Exception { + final IgniteCache<Integer, String> cache = ignite.jcache(null); + + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + final CountDownLatch l3 = new CountDownLatch(1); + + GridTestThread t1 = new GridTestThread(new Callable<Object>() { + /** {@inheritDoc} */ + @Nullable @Override public Object call() throws Exception { + info("Before lock for.key 1"); + + cache.lock(1).lock(); + + info("After lock for key 1"); + + try { + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + l1.countDown(); + + info("Let thread2 proceed."); + + // Reentry. + assert cache.lock(1).tryLock(); + + // Nested lock. + assert cache.lock(2).tryLock(); + + l2.await(); + + cache.lock(1).unlock(); + + // Unlock in reverse order. + cache.lock(2).unlock(); + + info("Waited for latch 2"); + } + finally { + cache.lock(1).unlock(); + + info("Unlocked entry for key 1."); + } + + l3.countDown(); + + return null; + } + }); + + GridTestThread t2 = new GridTestThread(new Callable<Object>() { + /** {@inheritDoc} */ + @Nullable @Override public Object call() throws Exception { + info("Waiting for latch1..."); + + l1.await(); + + info("Latch1 released."); + + assert !cache.lock(1).tryLock(); + + if (!cache.isLocked(1)) + throw new IllegalArgumentException(); + + assert !cache.isLockedByThread(1); + + info("Tried to lock cache for key1"); + + l2.countDown(); + + info("Released latch2"); + + l3.await(); + + assert cache.lock(1).tryLock(); + + try { + info("Locked cache for key 1"); + + assert cache.isLocked(1); + assert cache.isLockedByThread(1); + + info("Checked that cache is locked for key 1"); + } + finally { + cache.lock(1).unlock(); + + info("Unlocked cache for key 1"); + } + + assert !cache.isLocked(1); + assert !cache.isLockedByThread(1); + + return null; + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + t1.checkError(); + t2.checkError(); + + assert !cache.isLocked(1); + } + + /** + * + * @throws Exception If error occur. + */ + public void testBasicOps() throws Exception { + GridCache<Integer, String> cache = ignite.cache(null); + + CountDownLatch latch = new CountDownLatch(1); + + CacheEventListener lsnr = new CacheEventListener(latch); + + try { + ignite.events().localListen(lsnr, EVTS_CACHE); + + int key = (int)System.currentTimeMillis(); + + assert !cache.containsKey(key); + + cache.put(key, "a"); + + info("Start latch wait 1"); + + latch.await(); + + info("Stop latch wait 1"); + + assert cache.containsKey(key); + + latch = new CountDownLatch(2); + + lsnr.latch(latch); + + cache.put(key, "b"); + cache.put(key, "c"); + + info("Start latch wait 2"); + + latch.await(); + + info("Stop latch wait 2"); + + assert cache.containsKey(key); + + latch = new CountDownLatch(1); + + lsnr.latch(latch); + + cache.remove(key); + + info("Start latch wait 3"); + + latch.await(); + + info("Stop latch wait 3"); + + assert !cache.containsKey(key); + } + finally { + ignite.events().stopLocalListen(lsnr, EVTS_CACHE); + } + } + + /** + * @throws Exception If error occur. + */ + public void testBasicOpsWithReentry() throws Exception { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + int key = (int)System.currentTimeMillis(); + + assert !cache.containsKey(key); + + cache.lock(key).lock(); + + CountDownLatch latch = new CountDownLatch(1); + + CacheEventListener lsnr = new CacheEventListener(latch); + + try { + ignite.events().localListen(lsnr, EVTS_CACHE); + + cache.put(key, "a"); + + info("Start latch wait 1"); + + latch.await(); + + info("Stop latch wait 1"); + + assert cache.containsKey(key); + assert cache.isLockedByThread(key); + + latch = new CountDownLatch(2); + + lsnr.latch(latch); + + cache.put(key, "b"); + cache.put(key, "c"); + + info("Start latch wait 2"); + + latch.await(); + + info("Stop latch wait 2"); + + assert cache.containsKey(key); + assert cache.isLockedByThread(key); + + latch = new CountDownLatch(1); + + lsnr.latch(latch); + + cache.remove(key); + + info("Start latch wait 3"); + + latch.await(); + + info("Stop latch wait 3"); + + assert cache.isLocked(key); + } + finally { + cache.lock(key).unlock(); + + ignite.events().stopLocalListen(lsnr, EVTS_CACHE); + } + + // Entry should be evicted since allowEmptyEntries is false. + assert !cache.isLocked(key); + } + + /** + * @throws Exception If test failed. + */ + public void testMultiLocks() throws Exception { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + Set<Integer> keys = new HashSet<>(); + + Collections.addAll(keys, 1, 2, 3); + + cache.lockAll(keys).lock(); + + assert cache.isLocked(1); + assert cache.isLocked(2); + assert cache.isLocked(3); + + assert cache.isLockedByThread(1); + assert cache.isLockedByThread(2); + assert cache.isLockedByThread(3); + + cache.lockAll(keys).unlock(); + + assert !cache.isLocked(1); + assert !cache.isLocked(2); + assert !cache.isLocked(3); + + assert !cache.isLockedByThread(1); + assert !cache.isLockedByThread(2); + assert !cache.isLockedByThread(3); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testGetPutRemove() throws IgniteCheckedException { + GridCache<Integer, String> cache = ignite.cache(null); + + int key = (int)System.currentTimeMillis(); + + assert cache.get(key) == null; + assert cache.put(key, "1") == null; + + String val = cache.get(key); + + assert val != null; + assert "1".equals(val); + + val = cache.remove(key); + + assert val != null; + assert "1".equals(val); + assert cache.get(key) == null; + } + + /** + * + * @throws Exception In case of error. + */ + public void testPutWithExpiration() throws Exception { + IgniteCache<Integer, String> cache = ignite.jcache(null); + + CacheEventListener lsnr = new CacheEventListener(new CountDownLatch(1)); + + ignite.events().localListen(lsnr, EVTS_CACHE); + + ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, 200L)); + + try { + int key = (int)System.currentTimeMillis(); + + cache.withExpiryPolicy(expiry).put(key, "val"); + + assert cache.get(key) != null; + + cache.withExpiryPolicy(expiry).put(key, "val"); + + Thread.sleep(500); + + assert cache.get(key) == null; + } + finally { + ignite.events().stopLocalListen(lsnr, EVTS_CACHE); + } + } + + /** + * Event listener. + */ + private class CacheEventListener implements IgnitePredicate<IgniteEvent> { + /** Wait latch. */ + private CountDownLatch latch; + + /** Event types. */ + private int[] types; + + /** + * @param latch Wait latch. + * @param types Event types. + */ + CacheEventListener(CountDownLatch latch, int... types) { + this.latch = latch; + this.types = types; + + if (F.isEmpty(types)) + this.types = new int[] { EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED }; + } + + /** + * @param latch New latch. + */ + void latch(CountDownLatch latch) { + this.latch = latch; + } + + /** + * Waits for latch. + * + * @throws InterruptedException If got interrupted. + */ + void await() throws InterruptedException { + latch.await(); + } + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + info("Grid cache event: " + evt); + + if (U.containsIntArray(types, evt.type())) + latch.countDown(); + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java new file mode 100644 index 0000000..cc0cd8c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Basic store test. + */ +public abstract class GridCacheBasicStoreAbstractTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache store. */ + private static final GridCacheTestStore store = new GridCacheTestStore(); + + /** + * + */ + protected GridCacheBasicStoreAbstractTest() { + super(true /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store.resetTimestamp(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cache().clearAll(); + + store.reset(); + } + + /** @return Caching mode. */ + protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(atomicityMode()); + cc.setDistributionMode(distributionMode()); + cc.setPreloadMode(SYNC); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @return Distribution mode. + */ + protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } + + /** + * @return Cache atomicity mode. + */ + protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNotExistingKeys() throws IgniteCheckedException { + GridCache<Integer, String> cache = cache(); + Map<Integer, String> map = store.getMap(); + + cache.put(100, "hacuna matata"); + assertEquals(1, map.size()); + + cache.evict(100); + assertEquals(1, map.size()); + + assertEquals("hacuna matata", cache.remove(100)); + assertTrue(map.isEmpty()); + + store.resetLastMethod(); + assertNull(store.getLastMethod()); + + cache.remove(200); + assertEquals("remove", store.getLastMethod()); + + cache.get(300); + assertEquals("load", store.getLastMethod()); + } + + /** @throws Exception If test fails. */ + public void testWriteThrough() throws Exception { + GridCache<Integer, String> cache = cache(); + + Map<Integer, String> map = store.getMap(); + + assert map.isEmpty(); + + if (atomicityMode() == TRANSACTIONAL) { + try (IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (int i = 1; i <= 10; i++) { + cache.putx(i, Integer.toString(i)); + + checkLastMethod(null); + } + + tx.commit(); + } + } + else { + Map<Integer, String> putMap = new HashMap<>(); + + for (int i = 1; i <= 10; i++) + putMap.put(i, Integer.toString(i)); + + cache.putAll(putMap); + } + + checkLastMethod("putAll"); + + assert cache.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + store.resetLastMethod(); + + if (atomicityMode() == TRANSACTIONAL) { + try (IgniteTx tx = cache.txStart()) { + for (int i = 1; i <= 10; i++) { + String val = cache.remove(i); + + checkLastMethod(null); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + tx.commit(); + + checkLastMethod("removeAll"); + } + } + else { + Collection<Integer> keys = new ArrayList<>(10); + + for (int i = 1; i <= 10; i++) + keys.add(i); + + cache.removeAll(keys); + + checkLastMethod("removeAll"); + } + + assert map.isEmpty(); + } + + /** @throws Exception If test failed. */ + public void testReadThrough() throws Exception { + GridCache<Integer, String> cache = cache(); + + Map<Integer, String> map = store.getMap(); + + assert map.isEmpty(); + + if (atomicityMode() == TRANSACTIONAL) { + try (IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (int i = 1; i <= 10; i++) + cache.putx(i, Integer.toString(i)); + + checkLastMethod(null); + + tx.commit(); + } + } + else { + Map<Integer, String> putMap = new HashMap<>(); + + for (int i = 1; i <= 10; i++) + putMap.put(i, Integer.toString(i)); + + cache.putAll(putMap); + } + + checkLastMethod("putAll"); + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + cache.clearAll(); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + assert map.size() == 10; + + for (int i = 1; i <= 10; i++) { + // Read through. + String val = cache.get(i); + + checkLastMethod("load"); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + assert cache.size() == 10; + + cache.clearAll(); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + assert map.size() == 10; + + Collection<Integer> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(i); + + // Read through. + Map<Integer, String> vals = cache.getAll(keys); + + checkLastMethod("loadAll"); + + assert vals != null; + assert vals.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = vals.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + // Write through. + cache.removeAll(keys); + + checkLastMethod("removeAll"); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + assert map.isEmpty(); + } + + /** @throws Exception If test failed. */ + public void testLoadCache() throws Exception { + GridCache<Integer, String> cache = cache(); + + int cnt = 1; + + cache.loadCache(null, 0, cnt); + + checkLastMethod("loadAllFull"); + + assert !cache.isEmpty(); + + Map<Integer, String> map = cache.getAll(cache.keySet()); + + assert map.size() == cnt : "Invalid map size: " + map.size(); + + // Recheck last method to make sure + // values were read from cache. + checkLastMethod("loadAllFull"); + + int start = store.getStart(); + + for (int i = start; i < start + cnt; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + } + + /** @throws Exception If test failed. */ + public void testLoadCacheWithPredicate() throws Exception { + GridCache<Integer, String> cache = cache(); + + int cnt = 10; + + cache.loadCache(new P2<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + // Accept only even numbers. + return key % 2 == 0; + } + }, 0, cnt); + + checkLastMethod("loadAllFull"); + + Map<Integer, String> map = cache.getAll(cache.keySet()); + + assert map.size() == cnt / 2 : "Invalid map size: " + map.size(); + + // Recheck last method to make sure + // values were read from cache. + checkLastMethod("loadAllFull"); + + int start = store.getStart(); + + for (int i = start; i < start + cnt; i++) { + String val = map.get(i); + + if (i % 2 == 0) { + assert val != null; + assert val.equals(Integer.toString(i)); + } + else + assert val == null; + } + } + + /** @throws Exception If test failed. */ + public void testReloadCache() throws Exception { + GridCache<Integer, String> cache = cache(); + + cache.loadCache(null, 0, 0); + + assert cache.isEmpty(); + + checkLastMethod("loadAllFull"); + + for (int i = 1; i <= 10; i++) { + cache.put(i, Integer.toString(i)); + + checkLastMethod("put"); + } + + assert cache.size() == 10; + + cache.reloadAll(); + + checkLastMethod("loadAll"); + + assert cache.size() == 10; + + store.resetLastMethod(); + + for (int i = 1; i <= 10; i++) { + String val = cache.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + + cache.clearAll(); + + cache.loadCache(new P2<Integer, String>() { + @Override public boolean apply(Integer k, String v) { + // Only accept even numbers. + return k % 2 == 0; + } + }, 0, 10); + + checkLastMethod("loadAllFull"); + + store.resetLastMethod(); + + assertEquals(5, cache.size()); + + cache.forEach(new CIX1<GridCacheEntry<Integer, String>>() { + @Override public void applyx(GridCacheEntry<Integer, String> entry) throws IgniteCheckedException { + String val = entry.get(); + + assert val != null; + assert val.equals(Integer.toString(entry.getKey())); + assert entry.getKey() % 2 == 0; + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + }); + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + + /** @throws Exception If test failed. */ + public void testReloadAll() throws Exception { + GridCache<Integer, String> cache = cache(); + + assert cache.isEmpty(); + + Map<Integer, String> vals = new HashMap<>(); + + for (int i = 1; i <= 10; i++) + vals.put(i, Integer.toString(i)); + + cache.reloadAll(vals.keySet()); + + assert cache.isEmpty() : "Cache is not empty: " + cache.values(); + + checkLastMethod("loadAll"); + + cache.putAll(vals); + + checkLastMethod("putAll"); + + assert cache.size() == 10; + + cache.reloadAll(vals.keySet()); + + checkLastMethod("loadAll"); + + assert cache.size() == 10; + + store.resetLastMethod(); + + for (int i = 1; i <= 10; i++) { + String val = cache.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + + for (int i = 1; i <= 10; i++) + store.write(new CacheEntryImpl<>(i, "reloaded-" + i)); + + cache.reloadAll(vals.keySet()); + + checkLastMethod("loadAll"); + + store.resetLastMethod(); + + assert cache.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = cache.get(i); + + assert val != null; + assert val.equals("reloaded-" + i); + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + } + + /** @throws Exception If test failed. */ + @SuppressWarnings("StringEquality") + public void testReload() throws Exception { + GridCache<Integer, String> cache = cache(); + + assert cache.isEmpty(); + + Map<Integer, String> vals = new HashMap<>(); + + for (int i = 1; i <= 10; i++) + vals.put(i, Integer.toString(i)); + + cache.reloadAll(vals.keySet()); + + assert cache.isEmpty(); + + checkLastMethod("loadAll"); + + cache.putAll(vals); + + checkLastMethod("putAll"); + + assert cache.size() == 10; + + String val = cache.reload(1); + + assert val != null; + assert "1".equals(val); + + checkLastMethod("load"); + + assert cache.size() == 10; + + store.resetLastMethod(); + + for (int i = 1; i <= 10; i++) { + val = cache.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + + for (int i = 1; i <= 10; i++) + store.write(new CacheEntryImpl<>(i, "reloaded-" + i)); + + store.resetLastMethod(); + + assert cache.size() == 10; + + for (int i = 1; i <= 10; i++) { + val = cache.reload(i); + + checkLastMethod("load"); + + assert val != null; + assert val.equals("reloaded-" + i); + + store.resetLastMethod(); + + String cached = cache.get(i); + + assert cached != null; + + assert cached == val : "Cached value mismatch [expected=" + val + ", cached=" + cached + ']'; + + // Make sure that value is coming from cache, not from store. + checkLastMethod(null); + } + } + + /** @param mtd Expected last method value. */ + private void checkLastMethod(@Nullable String mtd) { + String lastMtd = store.getLastMethod(); + + if (mtd == null) + assert lastMtd == null : "Last method must be null: " + lastMtd; + else { + assert lastMtd != null : "Last method must be not null"; + assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']'; + } + } +}