http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java new file mode 100644 index 0000000..b0b432b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests for dht cache eviction. + */ +public class GridCacheDhtEvictionNearReadersSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Default constructor. */ + public GridCacheDhtEvictionNearReadersSelfTest() { + super(false /* don't start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setEvictSynchronized(true); + cacheCfg.setEvictNearSynchronized(true); + cacheCfg.setPreloadMode(SYNC); + cacheCfg.setAtomicityMode(atomicityMode()); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + cacheCfg.setBackups(1); + + // Set eviction queue size explicitly. + cacheCfg.setEvictSynchronizedKeyBufferSize(1); + cacheCfg.setEvictMaxOverflowRatio(0); + cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10)); + cacheCfg.setNearEvictionPolicy(new GridCacheFifoEvictionPolicy(10)); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @return Atomicity mode. + */ + public GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + if (GRID_CNT < 2) + throw new IgniteCheckedException("GRID_CNT must not be less than 2."); + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"SizeReplaceableByIsEmpty"}) + @Override protected void beforeTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + assert near(grid(i)).size() == 0; + assert dht(grid(i)).size() == 0; + + assert near(grid(i)).isEmpty(); + assert dht(grid(i)).isEmpty(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override protected void afterTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + near(grid(i)).removeAll(new IgnitePredicate[] {F.alwaysTrue()}); + + assert near(grid(i)).isEmpty() : "Near cache is not empty [idx=" + i + "]"; + assert dht(grid(i)).isEmpty() : "Dht cache is not empty [idx=" + i + "]"; + } + } + + /** + * @param node Node. + * @return Grid for the given node. + */ + private Ignite grid(ClusterNode node) { + return G.ignite(node.id()); + } + + /** + * @param g Grid. + * @return Near cache. + */ + @SuppressWarnings({"unchecked"}) + private GridNearCacheAdapter<Integer, String> near(Ignite g) { + return (GridNearCacheAdapter)((GridKernal)g).internalCache(); + } + + /** + * @param g Grid. + * @return Dht cache. + */ + @SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) + private GridDhtCacheAdapter<Integer, String> dht(Ignite g) { + return ((GridNearCacheAdapter)((GridKernal)g).internalCache()).dht(); + } + + /** + * @param idx Index. + * @return Affinity. + */ + private GridCacheConsistentHashAffinityFunction affinity(int idx) { + return (GridCacheConsistentHashAffinityFunction)grid(idx).cache(null).configuration().getAffinity(); + } + + /** + * @param key Key. + * @return Primary node for the given key. + */ + private Collection<ClusterNode> keyNodes(Object key) { + GridCacheConsistentHashAffinityFunction aff = affinity(0); + + return aff.nodes(aff.partition(key), grid(0).nodes(), 1); + } + + /** + * @param nodeId Node id. + * @return Predicate for events belonging to specified node. + */ + private IgnitePredicate<IgniteEvent> nodeEvent(final UUID nodeId) { + assert nodeId != null; + + return new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + info("Predicate called [e.nodeId()=" + e.node().id() + ", nodeId=" + nodeId + ']'); + + return e.node().id().equals(nodeId); + } + }; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testReaders() throws Exception { + Integer key = 1; + + Collection<ClusterNode> nodes = new ArrayList<>(keyNodes(key)); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + nodes.remove(primary); + + ClusterNode backup = F.first(nodes); + + assert backup != null; + + // Now calculate other node that doesn't own the key. + nodes = new ArrayList<>(grid(0).nodes()); + + nodes.remove(primary); + nodes.remove(backup); + + ClusterNode other = F.first(nodes); + + assert !F.eqNodes(primary, backup); + assert !F.eqNodes(primary, other); + assert !F.eqNodes(backup, other); + + info("Primary node: " + primary.id()); + info("Backup node: " + backup.id()); + info("Other node: " + other.id()); + + GridNearCacheAdapter<Integer, String> nearPrimary = near(grid(primary)); + GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(grid(primary)); + + GridNearCacheAdapter<Integer, String> nearBackup = near(grid(backup)); + GridDhtCacheAdapter<Integer, String> dhtBackup = dht(grid(backup)); + + GridNearCacheAdapter<Integer, String> nearOther = near(grid(other)); + GridDhtCacheAdapter<Integer, String> dhtOther = dht(grid(other)); + + String val = "v1"; + + // Put on primary node. + nearPrimary.put(key, val); + + GridDhtCacheEntry<Integer, String> entryPrimary = dhtPrimary.peekExx(key); + GridDhtCacheEntry<Integer, String> entryBackup = dhtBackup.peekExx(key); + + assert entryPrimary != null; + assert entryBackup != null; + assert nearOther.peekExx(key) == null; + assert dhtOther.peekExx(key) == null; + + IgniteFuture<IgniteEvent> futOther = + waitForLocalEvent(grid(other).events(), nodeEvent(other.id()), EVT_CACHE_ENTRY_EVICTED); + + IgniteFuture<IgniteEvent> futBackup = + waitForLocalEvent(grid(backup).events(), nodeEvent(backup.id()), EVT_CACHE_ENTRY_EVICTED); + + IgniteFuture<IgniteEvent> futPrimary = + waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED); + + // Get value on other node, it should be loaded to near cache. + assertEquals(val, nearOther.get(key, true, null)); + + entryPrimary = dhtPrimary.peekExx(key); + entryBackup = dhtBackup.peekExx(key); + + assert entryPrimary != null; + assert entryBackup != null; + + assertEquals(val, nearOther.peek(key)); + + assertTrue(!entryPrimary.readers().isEmpty()); + + // Evict on primary node. + // It will trigger dht eviction and eviction on backup node. + grid(primary).cache(null).evict(key); + + futOther.get(3000); + futBackup.get(3000); + futPrimary.get(3000); + + assertNull(dhtPrimary.peek(key)); + assertNull(nearPrimary.peek(key)); + + assertNull(dhtBackup.peek(key)); + assertNull(nearBackup.peek(key)); + + assertNull(dhtOther.peek(key)); + assertNull(nearOther.peek(key)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java new file mode 100644 index 0000000..6c311d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionSelfTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests for dht cache eviction. + */ +public class GridCacheDhtEvictionSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 2; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Default constructor. */ + public GridCacheDhtEvictionSelfTest() { + super(false /* don't start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setPreloadMode(NONE); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setEvictSynchronized(true); + cacheCfg.setEvictNearSynchronized(true); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + cacheCfg.setBackups(1); + + // Set eviction queue size explicitly. + cacheCfg.setEvictMaxOverflowRatio(0); + cacheCfg.setEvictSynchronizedKeyBufferSize(1); + cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000)); + cacheCfg.setNearEvictionPolicy(new GridCacheFifoEvictionPolicy(10000)); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + if (GRID_CNT < 2) + throw new IgniteCheckedException("GRID_CNT must not be less than 2."); + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"SizeReplaceableByIsEmpty"}) + @Override protected void beforeTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + assert near(grid(i)).size() == 0; + assert dht(grid(i)).size() == 0; + + assert near(grid(i)).isEmpty(); + assert dht(grid(i)).isEmpty(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override protected void afterTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + near(grid(i)).removeAll(new IgnitePredicate[] {F.alwaysTrue()}); + + assert near(grid(i)).isEmpty() : "Near cache is not empty [idx=" + i + "]"; + assert dht(grid(i)).isEmpty() : "Dht cache is not empty [idx=" + i + "]"; + } + } + + /** + * @param node Node. + * @return Grid for the given node. + */ + private Ignite grid(ClusterNode node) { + return G.ignite(node.id()); + } + + /** + * @param g Grid. + * @return Near cache. + */ + @SuppressWarnings({"unchecked"}) + private GridNearCacheAdapter<Integer, String> near(Ignite g) { + return (GridNearCacheAdapter)((GridKernal)g).internalCache(); + } + + /** + * @param g Grid. + * @return Dht cache. + */ + @SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) + private GridDhtCacheAdapter<Integer, String> dht(Ignite g) { + return ((GridNearCacheAdapter)((GridKernal)g).internalCache()).dht(); + } + + /** + * @param idx Index. + * @return Affinity. + */ + private GridCacheConsistentHashAffinityFunction affinity(int idx) { + return (GridCacheConsistentHashAffinityFunction)grid(idx).cache(null).configuration().getAffinity(); + } + + /** + * @param key Key. + * @return Primary node for the given key. + */ + private Collection<ClusterNode> keyNodes(Object key) { + GridCacheConsistentHashAffinityFunction aff = affinity(0); + + return aff.nodes(aff.partition(key), grid(0).nodes(), 1); + } + + /** + * @param nodeId Node id. + * @return Predicate for events belonging to specified node. + */ + private IgnitePredicate<IgniteEvent> nodeEvent(final UUID nodeId) { + assert nodeId != null; + + return new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + info("Predicate called [e.nodeId()=" + e.node().id() + ", nodeId=" + nodeId + ']'); + + return e.node().id().equals(nodeId); + } + }; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings("NullArgumentToVariableArgMethod") + public void testSingleKey() throws Exception { + Integer key = 1; + + Collection<ClusterNode> nodes = new ArrayList<>(keyNodes(key)); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + nodes.remove(primary); + + ClusterNode backup = F.first(nodes); + + assert backup != null; + + assert !F.eqNodes(primary, backup); + + info("Key primary node: " + primary.id()); + info("Key backup node: " + backup.id()); + + GridNearCacheAdapter<Integer, String> nearPrimary = near(grid(primary)); + GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(grid(primary)); + + GridNearCacheAdapter<Integer, String> nearBackup = near(grid(backup)); + GridDhtCacheAdapter<Integer, String> dhtBackup = dht(grid(backup)); + + String val = "v1"; + + // Put on primary node. + nearPrimary.put(key, val, null); + + assertEquals(val, nearPrimary.peek(key)); + assertEquals(val, dhtPrimary.peek(key)); + + assertEquals(val, nearBackup.peek(key)); + assertEquals(val, dhtBackup.peek(key)); + + GridDhtCacheEntry<Integer, String> entryPrimary = dhtPrimary.peekExx(key); + GridDhtCacheEntry<Integer, String> entryBackup = dhtBackup.peekExx(key); + + assert entryPrimary != null; + assert entryBackup != null; + + assertTrue(entryPrimary.readers().isEmpty()); + assertTrue(entryBackup.readers().isEmpty()); + + IgniteFuture<IgniteEvent> futBackup = + waitForLocalEvent(grid(backup).events(), nodeEvent(backup.id()), EVT_CACHE_ENTRY_EVICTED); + + IgniteFuture<IgniteEvent> futPrimary = + waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED); + + // Evict on primary node. + // It should trigger dht eviction and eviction on backup node. + assert grid(primary).cache(null).evict(key); + + // Give 5 seconds for eviction event to occur on backup and primary node. + futBackup.get(3000); + futPrimary.get(3000); + + assertEquals(0, nearPrimary.size()); + + assertNull(nearPrimary.peekExx(key)); + assertNull(dhtPrimary.peekExx(key)); + + assertNull(nearBackup.peekExx(key)); + assertNull(dhtBackup.peekExx(key)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings("NullArgumentToVariableArgMethod") + public void testMultipleKeys() throws Exception { + final int keyCnt = 1000; + + final Ignite primaryIgnite = grid(0); + final Ignite backupIgnite = grid(1); + + GridNearCacheAdapter<Integer, String> nearPrimary = near(primaryIgnite); + GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(primaryIgnite); + + GridNearCacheAdapter<Integer, String> nearBackup = near(backupIgnite); + GridDhtCacheAdapter<Integer, String> dhtBackup = dht(backupIgnite); + + Collection<Integer> keys = new ArrayList<>(keyCnt); + + for (int key = 0; keys.size() < keyCnt; key++) + if (F.eqNodes(primaryIgnite.cluster().localNode(), F.first(keyNodes(key)))) + keys.add(key++); + + info("Test keys: " + keys); + + // Put on primary node. + for (Integer key : keys) + nearPrimary.put(key, "v" + key, null); + + for (Integer key : keys) { + String val = "v" + key; + + assertEquals(val, nearPrimary.peek(key)); + assertEquals(val, dhtPrimary.peek(key)); + + assertEquals(val, nearBackup.peek(key)); + assertEquals(val, dhtBackup.peek(key)); + } + + final AtomicInteger cntBackup = new AtomicInteger(); + + IgniteFuture<IgniteEvent> futBackup = waitForLocalEvent(backupIgnite.events(), new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + return e.node().id().equals(backupIgnite.cluster().localNode().id()) && + cntBackup.incrementAndGet() == keyCnt; + } + }, EVT_CACHE_ENTRY_EVICTED); + + final AtomicInteger cntPrimary = new AtomicInteger(); + + IgniteFuture<IgniteEvent> futPrimary = waitForLocalEvent(primaryIgnite.events(), new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + return e.node().id().equals(primaryIgnite.cluster().localNode().id()) && + cntPrimary.incrementAndGet() == keyCnt; + } + }, EVT_CACHE_ENTRY_EVICTED); + + // Evict on primary node. + // Eviction of the last key should trigger queue processing. + for (Integer key : keys) { + boolean evicted = primaryIgnite.cache(null).evict(key); + + assert evicted; + } + + // Give 5 seconds for eviction events to occur on backup and primary node. + futBackup.get(3000); + futPrimary.get(3000); + + info("nearBackupSize: " + nearBackup.size()); + info("dhtBackupSize: " + dhtBackup.size()); + info("nearPrimarySize: " + nearPrimary.size()); + info("dhtPrimarySize: " + dhtPrimary.size()); + + // Check backup node first. + for (Integer key : keys) { + String msg = "Failed key: " + key; + + assertNull(msg, nearBackup.peek(key)); + assertNull(msg, dhtBackup.peek(key)); + assertNull(msg, nearBackup.peekExx(key)); + assertNull(msg, dhtBackup.peekExx(key)); + } + + for (Integer key : keys) { + String msg = "Failed key: " + key; + + assertNull(msg, nearPrimary.peek(key)); + assertNull(msg, dhtPrimary.peek(key)); + assertNull(msg, nearPrimary.peekExx(key)); + assertNull(dhtPrimary.peekExx(key)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java new file mode 100644 index 0000000..2aeccf8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionsDisabledSelfTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Test cache closure execution. + */ +public class GridCacheDhtEvictionsDisabledSelfTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheDhtEvictionsDisabledSelfTest() { + super(false); // Don't start grid node. + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + c.setDiscoverySpi(spi); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName("test"); + cc.setCacheMode(GridCacheMode.PARTITIONED); + cc.setDefaultTimeToLive(0); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(PARTITIONED_ONLY); + + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** @throws Exception If failed. */ + public void testOneNode() throws Exception { + checkNodes(startGridsMultiThreaded(1)); + + assertEquals(26, colocated(0, "test").size()); + assertEquals(26, cache(0, "test").size()); + } + + /** @throws Exception If failed. */ + public void testTwoNodes() throws Exception { + checkNodes(startGridsMultiThreaded(2)); + + assertTrue(colocated(0, "test").size() > 0); + assertTrue(cache(0, "test").size() > 0); + } + + /** @throws Exception If failed. */ + public void testThreeNodes() throws Exception { + checkNodes(startGridsMultiThreaded(3)); + + assertTrue(colocated(0, "test").size() > 0); + assertTrue(cache(0, "test").size() > 0); + } + + /** + * @param g Grid. + * @throws Exception If failed. + */ + private void checkNodes(Ignite g) throws Exception { + GridCache<String, String> cache = g.cache("test"); + + for (char c = 'a'; c <= 'z'; c++) { + String key = Character.toString(c); + + cache.put(key, "val-" + key); + + String v1 = cache.get(key); + String v2 = cache.get(key); // Get second time. + + info("v1: " + v1); + info("v2: " + v2); + + assertNotNull(v1); + assertNotNull(v2); + + if (cache.affinity().mapKeyToNode(key).isLocal()) + assertSame(v1, v2); + else + assertEquals(v1, v2); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java new file mode 100644 index 0000000..e1aabd2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtExpiredEntriesPreloadSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests preloading of expired entries. + */ +public class GridCacheDhtExpiredEntriesPreloadSelfTest extends GridCacheExpiredEntriesPreloadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java new file mode 100644 index 0000000..b20d048 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtInternalEntrySelfTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.datastructures.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePeekMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Tests for internal DHT entry. + */ +public class GridCacheDhtInternalEntrySelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private static final int GRID_CNT = 2; + + /** Atomic long name. */ + private static final String ATOMIC_LONG_NAME = "test.atomic.long"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setPreloadMode(SYNC); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 2)); + cacheCfg.setBackups(0); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setDistributionMode(GridCacheDistributionMode.NEAR_PARTITIONED); + cacheCfg.setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy()); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** @throws Exception If failed. */ + public void testInternalKeyReaders() throws Exception { + IgniteBiTuple<ClusterNode, ClusterNode> nodes = getNodes(ATOMIC_LONG_NAME); + + ClusterNode primary = nodes.get1(); + ClusterNode other = nodes.get2(); + + // Create on non-primary node. + GridCacheAtomicLong l = grid(other).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true); + + assert l != null; + assert l.get() == 1; + + check(primary, other, true); + + // Update on primary. + l = grid(primary).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true); + + assert l != null; + assert l.get() == 1; + + l.incrementAndGet(); + + assert l.get() == 2; + + // Check on non-primary. + l = grid(other).cache(null).dataStructures().atomicLong(ATOMIC_LONG_NAME, 1, true); + + assert l != null; + assert l.get() == 2; + + check(primary, other, true); + + // Remove. + assert grid(other).cache(null).dataStructures().removeAtomicLong(ATOMIC_LONG_NAME); + + check(primary, other, false); + } + + /** + * @param primary Primary node. + * @param other Non-primary node. + * @param exists Whether entry is expected to exist. + * @throws Exception In case of error. + */ + private void check(ClusterNode primary, ClusterNode other, boolean exists) throws Exception { + if (exists) { + // Check primary node has entry in DHT cache. + assert peekNear(primary) == null; + assert peekDht(primary) != null; + + // Check non-primary node has entry in near cache. + assert peekNear(other) != null; + assert peekDht(other) == null; + + // Check primary node has reader for non-primary node. + assert peekDhtEntry(primary).readers().contains(other.id()); + } + else { + assert peekGlobal(primary) == null; + assert peekGlobal(other) == null; + } + } + + /** + * @param node Node. + * @return Atomic long value. + */ + private GridCacheAtomicLongValue peekGlobal(ClusterNode node) { + return (GridCacheAtomicLongValue)grid(node).cache(null).peek( + new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME)); + } + + /** + * @param node Node. + * @return Atomic long value. + * @throws IgniteCheckedException In case of error. + */ + private GridCacheAtomicLongValue peekNear(ClusterNode node) throws IgniteCheckedException { + return (GridCacheAtomicLongValue)grid(node).cache(null).peek( + new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME), Collections.singleton(NEAR_ONLY)); + } + + /** + * @param node Node. + * @return Atomic long value. + * @throws IgniteCheckedException In case of error. + */ + private GridCacheAtomicLongValue peekDht(ClusterNode node) throws IgniteCheckedException { + return (GridCacheAtomicLongValue)grid(node).cache(null).peek( + new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME), Collections.singleton(PARTITIONED_ONLY)); + } + + /** + * @param node Node. + * @return DHT entry. + */ + private GridDhtCacheEntry<Object, Object> peekDhtEntry(ClusterNode node) { + return (GridDhtCacheEntry<Object, Object>)dht(grid(node).cache(null)).peekEx( + new GridCacheInternalKeyImpl(ATOMIC_LONG_NAME)); + } + + /** + * @param key Key. + * @return Pair {primary node, some other node}. + */ + private IgniteBiTuple<ClusterNode, ClusterNode> getNodes(String key) { + GridCacheAffinity<Object> aff = grid(0).cache(null).affinity(); + + ClusterNode primary = aff.mapKeyToNode(key); + + assert primary != null; + + Collection<ClusterNode> nodes = new ArrayList<>(grid(0).nodes()); + + nodes.remove(primary); + + ClusterNode other = F.first(nodes); + + assert other != null; + + assert !F.eqNodes(primary, other); + + return F.t(primary, other); + } + + /** + * @param node Node. + * @return Grid. + */ + private Ignite grid(ClusterNode node) { + return G.ignite(node.id()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java new file mode 100644 index 0000000..e50d4f3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMappingSelfTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Tests dht mapping. + */ +public class GridCacheDhtMappingSelfTest extends GridCommonAbstractTest { + /** Number of key backups. */ + private static final int BACKUPS = 1; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setPreloadMode(SYNC); + cacheCfg.setBackups(BACKUPS); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** @throws Exception If failed. */ + public void testMapping() throws Exception { + int nodeCnt = 5; + + startGridsMultiThreaded(nodeCnt); + + GridCache<Integer, Integer> cache = grid(nodeCnt - 1).cache(null); + + int kv = 1; + + cache.put(kv, kv); + + int cnt = 0; + + for (int i = 0; i < nodeCnt; i++) { + Ignite g = grid(i); + + GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>) + ((GridKernal)g).<Integer, Integer>internalCache()).dht(); + + if (dht.peek(kv) != null) { + info("Key found on node: " + g.cluster().localNode().id()); + + cnt++; + } + } + + // Test key should be on primary and backup node only. + assertEquals(1 + BACKUPS, cnt); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java new file mode 100644 index 0000000..13b7cb0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtMultiBackupTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.io.*; + +/** + * + */ +public class GridCacheDhtMultiBackupTest extends GridCommonAbstractTest { + /** + * + */ + public GridCacheDhtMultiBackupTest() { + super(false /* don't start grid. */); + } + + /** + * @throws Exception If failed + */ + public void testPut() throws Exception { + try { + Ignite g = G.start("examples/config/example-cache.xml"); + + if (g.cluster().nodes().size() < 5) + U.warn(log, "Topology is too small for this test. " + + "Run with 4 remote nodes or more having large number of backup nodes."); + + g.compute().run(new CAX() { + @IgniteInstanceResource + private Ignite g; + + @Override public void applyx() throws IgniteCheckedException { + X.println("Checking whether cache is empty."); + + GridCache<SampleKey, SampleValue> cache = g.cache("partitioned"); + + assert cache.isEmpty(); + } + } + ); + + GridCache<SampleKey, SampleValue> cache = g.cache("partitioned"); + + int cnt = 0; + + for (int key = 0; key < 1000; key++) { + SampleKey key1 = new SampleKey(key); + + if (!g.cluster().localNode().id().equals(g.cluster().mapKeyToNode("partitioned", key1).id())) { + cache.put(key1, new SampleValue(key)); + + cnt++; + } + } + + X.println(">>> Put count: " + cnt); + } + finally { + G.stopAll(false); + } + } + + /** + * + */ + private static class SampleKey implements Serializable { + /** */ + private int key; + + /** + * @param key + */ + private SampleKey(int key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SampleKey && ((SampleKey)obj).key == key; + } + } + + /** + * + */ + private static class SampleValue implements Serializable { + /** */ + private int val; + + /** + * @param val + */ + private SampleValue(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SampleValue && ((SampleValue)obj).val == val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java new file mode 100644 index 0000000..9a7d262 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadBigDataSelfTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test large cache counts. + */ +public class GridCacheDhtPreloadBigDataSelfTest extends GridCommonAbstractTest { + /** Size of values in KB. */ + private static final int KBSIZE = 10 * 1024; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** */ + private int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** */ + private LifecycleBean lbean; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheDhtPreloadBigDataSelfTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setPreloadBatchSize(preloadBatchSize); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setPreloadMode(preloadMode); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cc.setBackups(backups); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + if (lbean != null) + c.setLifecycleBeans(lbean); + + c.setDiscoverySpi(disco); + c.setCacheConfiguration(cc); + c.setDeploymentMode(CONTINUOUS); + c.setNetworkTimeout(1000); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + // Clean up memory for test suite. + lbean = null; + } + + /** + * @throws Exception If failed. + */ + public void testLargeObjects() throws Exception { + preloadMode = SYNC; + + try { + startGrid(0); + + int cnt = 10000; + + populate(grid(0).<Integer, byte[]>cache(null), cnt, KBSIZE); + + int gridCnt = 3; + + for (int i = 1; i < gridCnt; i++) + startGrid(i); + + Thread.sleep(10000); + + for (int i = 0; i < gridCnt; i++) { + GridCache<Integer, String> c = grid(i).cache(null); + + if (backups + 1 <= gridCnt) + assert c.size() < cnt : "Cache size: " + c.size(); + else + assert c.size() == cnt; + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLargeObjectsWithLifeCycleBean() throws Exception { + preloadMode = SYNC; + partitions = 23; + + try { + final int cnt = 10000; + + lbean = new LifecycleBean() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + if (evt == LifecycleEventType.AFTER_GRID_START) { + GridCache<Integer, byte[]> c = ignite.cache(null); + + if (c.putxIfAbsent(-1, new byte[1])) { + populate(c, cnt, KBSIZE); + + info(">>> POPULATED GRID <<<"); + } + } + } + }; + + int gridCnt = 3; + + for (int i = 0; i < gridCnt; i++) + startGrid(i); + + for (int i = 0; i < gridCnt; i++) + info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']'); + + Thread.sleep(10000); + + for (int i = 0; i < gridCnt; i++) { + GridCache<Integer, String> c = grid(i).cache(null); + + if (backups + 1 <= gridCnt) + assert c.size() < cnt; + else + assert c.size() == cnt; + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param c Cache. + * @param cnt Key count. + * @param kbSize Size in KB. + * @throws IgniteCheckedException If failed. + */ + private void populate(GridCache<Integer, byte[]> c, int cnt, int kbSize) throws IgniteCheckedException { + for (int i = 0; i < cnt; i++) + c.put(i, value(kbSize)); + } + + /** + * @param size Size. + * @return Value. + */ + private byte[] value(int size) { + return new byte[size]; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 6 * 60 * 1000; // 6 min. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java new file mode 100644 index 0000000..0965c87 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + * + * Forum example <a href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449"> + * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a> + */ +public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { + /** Key count. */ + private static final int KEY_CNT = 100; + + /** Preload delay. */ + private static final int PRELOAD_DELAY = 5000; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** Preload delay. */ + private long delay = -1; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + assert preloadMode != null; + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setPreloadMode(preloadMode); + cc.setPreloadPartitionedDelay(delay); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 128)); + cc.setBackups(1); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + c.setDiscoverySpi(disco); + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** @throws Exception If failed. */ + public void testManualPreload() throws Exception { + delay = -1; + + Ignite g0 = startGrid(0); + + int cnt = KEY_CNT; + + GridCache<String, Integer> c0 = g0.cache(null); + + for (int i = 0; i < cnt; i++) + c0.put(Integer.toString(i), i); + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + GridCache<String, Integer> c1 = g1.cache(null); + GridCache<String, Integer> c2 = g2.cache(null); + + for (int i = 0; i < cnt; i++) + assertNull(c1.peek(Integer.toString(i))); + + for (int i = 0; i < cnt; i++) + assertNull(c2.peek(Integer.toString(i))); + + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + + g1.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + l1.countDown(); + + return true; + } + }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + g2.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + l2.countDown(); + + return true; + } + }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + info("Beginning to wait for cache1 repartition."); + + GridDhtCacheAdapter<String, Integer> d0 = dht(0); + GridDhtCacheAdapter<String, Integer> d1 = dht(1); + GridDhtCacheAdapter<String, Integer> d2 = dht(2); + + checkMaps(false, d0, d1, d2); + + // Force preload. + c1.forceRepartition(); + + l1.await(); + + info("Cache1 is repartitioned."); + + checkMaps(false, d0, d1, d2); + + info("Beginning to wait for cache2 repartition."); + + // Force preload. + c2.forceRepartition(); + + l2.await(); + + info("Cache2 is repartitioned."); + + checkMaps(true, d0, d1, d2); + + checkCache(c0, cnt); + checkCache(c1, cnt); + checkCache(c2, cnt); + } + + /** @throws Exception If failed. */ + public void testDelayedPreload() throws Exception { + delay = PRELOAD_DELAY; + + Ignite g0 = startGrid(0); + + int cnt = KEY_CNT; + + GridCache<String, Integer> c0 = g0.cache(null); + + for (int i = 0; i < cnt; i++) + c0.put(Integer.toString(i), i); + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + GridCache<String, Integer> c1 = g1.cache(null); + GridCache<String, Integer> c2 = g2.cache(null); + + for (int i = 0; i < cnt; i++) + assertNull(c1.peek(Integer.toString(i))); + + for (int i = 0; i < cnt; i++) + assertNull(c2.peek(Integer.toString(i))); + + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + + g1.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + l1.countDown(); + + return true; + } + }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + g2.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + l2.countDown(); + + return true; + } + }, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + U.sleep(1000); + + GridDhtCacheAdapter<String, Integer> d0 = dht(0); + GridDhtCacheAdapter<String, Integer> d1 = dht(1); + GridDhtCacheAdapter<String, Integer> d2 = dht(2); + + info("Beginning to wait for caches repartition."); + + checkMaps(false, d0, d1, d2); + + assert l1.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS); + + assert l2.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS); + + U.sleep(1000); + + info("Caches are repartitioned."); + + checkMaps(true, d0, d1, d2); + + checkCache(c0, cnt); + checkCache(c1, cnt); + checkCache(c2, cnt); + } + + /** @throws Exception If failed. */ + public void testAutomaticPreload() throws Exception { + delay = 0; + preloadMode = GridCachePreloadMode.SYNC; + + Ignite g0 = startGrid(0); + + int cnt = KEY_CNT; + + GridCache<String, Integer> c0 = g0.cache(null); + + for (int i = 0; i < cnt; i++) + c0.put(Integer.toString(i), i); + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + GridCache<String, Integer> c1 = g1.cache(null); + GridCache<String, Integer> c2 = g2.cache(null); + + GridDhtCacheAdapter<String, Integer> d0 = dht(0); + GridDhtCacheAdapter<String, Integer> d1 = dht(1); + GridDhtCacheAdapter<String, Integer> d2 = dht(2); + + checkMaps(true, d0, d1, d2); + + checkCache(c0, cnt); + checkCache(c1, cnt); + checkCache(c2, cnt); + } + + /** @throws Exception If failed. */ + public void testAutomaticPreloadWithEmptyCache() throws Exception { + preloadMode = SYNC; + + delay = 0; + + Collection<Ignite> ignites = new ArrayList<>(); + + try { + for (int i = 0; i < 5; i++) { + ignites.add(startGrid(i)); + + awaitPartitionMapExchange(); + + for (Ignite g : ignites) { + info(">>> Checking affinity for grid: " + g.name()); + + GridDhtPartitionTopology<Integer, String> top = topology(g); + + GridDhtPartitionFullMap fullMap = top.partitionMap(true); + + for (Map.Entry<UUID, GridDhtPartitionMap> fe : fullMap.entrySet()) { + UUID nodeId = fe.getKey(); + + GridDhtPartitionMap m = fe.getValue(); + + for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) { + int p = e.getKey(); + GridDhtPartitionState state = e.getValue(); + + Collection<ClusterNode> nodes = affinityNodes(g, p); + + Collection<UUID> nodeIds = U.nodeIds(nodes); + + assert nodeIds.contains(nodeId) : "Invalid affinity mapping [nodeId=" + nodeId + + ", part=" + p + ", state=" + state + ", grid=" + G.ignite(nodeId).name() + + ", affNames=" + U.nodes2names(nodes) + ", affIds=" + nodeIds + ']'; + } + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testManualPreloadSyncMode() throws Exception { + preloadMode = GridCachePreloadMode.SYNC; + delay = -1; + + try { + startGrid(0); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testPreloadManyNodes() throws Exception { + delay = 0; + preloadMode = ASYNC; + + startGridsMultiThreaded(9); + + U.sleep(2000); + + try { + delay = -1; + preloadMode = ASYNC; + + Ignite g = startGrid(9); + + info(">>> Starting manual preload"); + + long start = System.currentTimeMillis(); + + g.cache(null).forceRepartition().get(); + + info(">>> Finished preloading of empty cache in " + (System.currentTimeMillis() - start) + "ms."); + } + finally { + stopAllGrids(); + } + } + + /** + * @param g Grid. + * @return Topology. + */ + private GridDhtPartitionTopology<Integer, String> topology(Ignite g) { + return ((GridNearCacheAdapter<Integer, String>)((GridKernal)g).<Integer, String>internalCache()).dht().topology(); + } + + /** + * @param g Grid. + * @return Affinity. + */ + private GridCacheAffinity<Object> affinity(Ignite g) { + return g.cache(null).affinity(); + } + + /** + * @param g Grid. + * @param p Partition. + * @return Affinity nodes. + */ + private Collection<ClusterNode> affinityNodes(Ignite g, int p) { + return affinity(g).mapPartitionToPrimaryAndBackups(p); + } + + /** + * Checks if keys are present. + * + * @param c Cache. + * @param keyCnt Key count. + */ + private void checkCache(GridCache<String, Integer> c, int keyCnt) { + Ignite g = c.gridProjection().ignite(); + + for (int i = 0; i < keyCnt; i++) { + String key = Integer.toString(i); + + if (c.affinity().isPrimaryOrBackup(g.cluster().localNode(), key)) + assertEquals(Integer.valueOf(i), c.peek(key)); + } + } + + /** + * Checks maps for equality. + * + * @param strict Strict check flag. + * @param caches Maps to compare. + */ + private void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches) + throws IgniteInterruptedException { + if (caches.length < 2) + return; + + GridTestUtils.retryAssert(log, 50, 500, new CAX() { + @Override public void applyx() { + info("Checking partition maps."); + + for (int i = 0; i < caches.length; i++) + info("Partition map for node " + i + ": " + caches[i].topology().partitionMap(false).toFullString()); + + GridDhtPartitionFullMap orig = caches[0].topology().partitionMap(true); + + for (int i = 1; i < caches.length; i++) { + GridDhtPartitionFullMap cmp = caches[i].topology().partitionMap(true); + + assert orig.keySet().equals(cmp.keySet()); + + for (Map.Entry<UUID, GridDhtPartitionMap> entry : orig.entrySet()) { + UUID nodeId = entry.getKey(); + + GridDhtPartitionMap nodeMap = entry.getValue(); + + GridDhtPartitionMap cmpMap = cmp.get(nodeId); + + assert cmpMap != null; + + assert nodeMap.keySet().equals(cmpMap.keySet()); + + for (Map.Entry<Integer, GridDhtPartitionState> nodeEntry : nodeMap.entrySet()) { + GridDhtPartitionState state = cmpMap.get(nodeEntry.getKey()); + + assert state != null; + assert state != GridDhtPartitionState.EVICTED; + assert !strict || state == GridDhtPartitionState.OWNING : "Invalid partition state: " + state; + assert state == nodeEntry.getValue(); + } + } + } + } + }); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java new file mode 100644 index 0000000..5392dee --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDisabledSelfTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + */ +public class GridCacheDhtPreloadDisabledSelfTest extends GridCommonAbstractTest { + /** Flat to print preloading events. */ + private static final boolean DEBUG = false; + + /** */ + private static final long TEST_TIMEOUT = 5 * 60 * 1000; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheDhtPreloadDisabledSelfTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_ASYNC); + cacheCfg.setPreloadMode(NONE); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cacheCfg.setBackups(backups); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + //cacheCfg.setPreloadThreadPoolSize(1); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheCfg); + cfg.setDeploymentMode(CONTINUOUS); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @param i Grid index. + * @return Topology. + */ + private GridDhtPartitionTopology<Integer, String> topology(int i) { + return near(grid(i).<Integer, String>cache(null)).dht().topology(); + } + + /** @throws Exception If failed. */ + public void testSamePartitionMap() throws Exception { + backups = 1; + partitions = 10; + + int nodeCnt = 4; + + startGridsMultiThreaded(nodeCnt); + + try { + for (int p = 0; p < partitions; p++) { + List<Collection<ClusterNode>> mappings = new ArrayList<>(nodeCnt); + + for (int i = 0; i < nodeCnt; i++) { + Collection<ClusterNode> nodes = topology(i).nodes(p, -1); + List<ClusterNode> owners = topology(i).owners(p); + + int size = backups + 1; + + assert owners.size() == size : "Size mismatch [nodeIdx=" + i + ", p=" + p + ", size=" + size + + ", owners=" + F.nodeIds(owners) + ']'; + assert nodes.size() == size : "Size mismatch [nodeIdx=" + i + ", p=" + p + ", size=" + size + + ", nodes=" + F.nodeIds(nodes) + ']'; + + assert F.eqNotOrdered(nodes, owners); + assert F.eqNotOrdered(owners, nodes); + + mappings.add(owners); + } + + for (int i = 0; i < mappings.size(); i++) { + Collection<ClusterNode> m1 = mappings.get(i); + + for (int j = 0; j != i && j < mappings.size(); j++) { + Collection<ClusterNode> m2 = mappings.get(j); + + assert F.eqNotOrdered(m1, m2) : "Mappings are not equal [m1=" + F.nodeIds(m1) + ", m2=" + + F.nodeIds(m2) + ']'; + assert F.eqNotOrdered(m2, m1) : "Mappings are not equal [m1=" + F.nodeIds(m1) + ", m2=" + + F.nodeIds(m2) + ']'; + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testDisabledPreloader() throws Exception { + try { + Ignite ignite1 = startGrid(0); + + GridCache<Integer, String> cache1 = ignite1.cache(null); + + int keyCnt = 10; + + putKeys(cache1, keyCnt); + + for (int i = 0; i < keyCnt; i++) { + assertNull(near(cache1).peekEx(i)); + assertNotNull((dht(cache1).peekEx(i))); + + assertEquals(Integer.toString(i), cache1.peek(i)); + } + + int nodeCnt = 3; + + List<Ignite> ignites = new ArrayList<>(nodeCnt); + + startGrids(nodeCnt, 1, ignites); + + // Check all nodes. + for (Ignite g : ignites) { + GridCache<Integer, String> c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) + assertNull(c.peek(i)); + } + + Collection<Integer> keys = new LinkedList<>(); + + for (int i = 0; i < keyCnt; i++) + if (cache1.affinity().mapKeyToNode(i).equals(ignite1.cluster().localNode())) + keys.add(i); + + info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" + + U.grids2names(ignites) + ']'); + + for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) { + Ignite g = it.next(); + + it.remove(); + + stopGrid(g.name()); + + // Check all nodes. + for (Ignite gg : ignites) { + GridCache<Integer, String> c = gg.cache(null); + + for (int i = 0; i < keyCnt; i++) + assertNull(c.peek(i)); + } + } + + for (Integer i : keys) + assertEquals(i.toString(), cache1.peek(i)); + } + catch (Error | Exception e) { + error("Test failed.", e); + + throw e; + } + finally { + stopAllGrids(); + } + } + + /** + * @param cnt Number of grids. + * @param startIdx Start node index. + * @param list List of started grids. + * @throws Exception If failed. + */ + private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception { + for (int i = 0; i < cnt; i++) { + final Ignite g = startGrid(startIdx++); + + if (DEBUG) + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("\n>>> Preload event [grid=" + g.name() + ", evt=" + evt + ']'); + + return true; + } + }, EVTS_CACHE_PRELOAD); + + list.add(g); + } + } + + /** @param grids Grids to stop. */ + private void stopGrids(Iterable<Ignite> grids) { + for (Ignite g : grids) + stopGrid(g.name()); + } + + /** + * @param c Cache. + * @param cnt Key count. + * @throws IgniteCheckedException If failed. + */ + private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException { + for (int i = 0; i < cnt; i++) + c.put(i, Integer.toString(i)); + } +}