http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationValidationSelfTest.java new file mode 100644 index 0000000..8564481 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationValidationSelfTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Attribute validation self test. + */ +public class GridCacheConfigurationValidationSelfTest extends GridCommonAbstractTest { + /** */ + private static final String NON_DFLT_CACHE_NAME = "non-dflt-cache"; + + /** */ + private static final String WRONG_PRELOAD_MODE_GRID_NAME = "preloadModeCheckFails"; + + /** */ + private static final String WRONG_CACHE_MODE_GRID_NAME = "cacheModeCheckFails"; + + /** */ + private static final String WRONG_AFFINITY_GRID_NAME = "cacheAffinityCheckFails"; + + /** */ + private static final String WRONG_AFFINITY_MAPPER_GRID_NAME = "cacheAffinityMapperCheckFails"; + + /** */ + private static final String WRONG_OFF_HEAP_GRID_NAME = "cacheOhhHeapCheckFails"; + + /** */ + private static final String DUP_CACHES_GRID_NAME = "duplicateCachesCheckFails"; + + /** */ + private static final String DUP_DFLT_CACHES_GRID_NAME = "duplicateDefaultCachesCheckFails"; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Constructs test. + */ + public GridCacheConfigurationValidationSelfTest() { + super(/* don't start grid */ false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + // Default cache config. + CacheConfiguration dfltCacheCfg = defaultCacheConfiguration(); + + dfltCacheCfg.setCacheMode(PARTITIONED); + dfltCacheCfg.setPreloadMode(ASYNC); + dfltCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + dfltCacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction()); + + // Non-default cache configuration. + CacheConfiguration namedCacheCfg = defaultCacheConfiguration(); + + namedCacheCfg.setCacheMode(PARTITIONED); + namedCacheCfg.setPreloadMode(ASYNC); + namedCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + namedCacheCfg.setName(NON_DFLT_CACHE_NAME); + namedCacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction()); + + // Modify cache config according to test parameters. + if (gridName.contains(WRONG_PRELOAD_MODE_GRID_NAME)) + dfltCacheCfg.setPreloadMode(SYNC); + else if (gridName.contains(WRONG_CACHE_MODE_GRID_NAME)) + dfltCacheCfg.setCacheMode(REPLICATED); + else if (gridName.contains(WRONG_AFFINITY_GRID_NAME)) { + dfltCacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction() { + // No-op. Just to have another class name. + }); + } + else if (gridName.contains(WRONG_AFFINITY_MAPPER_GRID_NAME)) { + dfltCacheCfg.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper() { + // No-op. Just to have another class name. + }); + } + else if (gridName.contains(WRONG_OFF_HEAP_GRID_NAME)) + dfltCacheCfg.setMemoryMode(OFFHEAP_VALUES); + + if (gridName.contains(DUP_CACHES_GRID_NAME)) + cfg.setCacheConfiguration(namedCacheCfg, namedCacheCfg); + else if (gridName.contains(DUP_DFLT_CACHES_GRID_NAME)) + cfg.setCacheConfiguration(dfltCacheCfg, dfltCacheCfg); + else + // Normal configuration. + cfg.setCacheConfiguration(dfltCacheCfg, namedCacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * This test method does not require remote nodes. + * + * @throws Exception If failed. + */ + public void testDuplicateCacheConfigurations() throws Exception { + // This grid should not start. + startInvalidGrid(DUP_CACHES_GRID_NAME); + + // This grid should not start. + startInvalidGrid(DUP_DFLT_CACHES_GRID_NAME); + } + + /** + * @throws Exception If fails. + */ + public void testCacheAttributesValidation() throws Exception { + try { + startGrid(0); + + // This grid should not start. + startInvalidGrid(WRONG_PRELOAD_MODE_GRID_NAME); + + // This grid should not start. + startInvalidGrid(WRONG_CACHE_MODE_GRID_NAME); + + // This grid should not start. + startInvalidGrid(WRONG_AFFINITY_GRID_NAME); + + // This grid should not start. + startInvalidGrid(WRONG_AFFINITY_MAPPER_GRID_NAME); + + // This grid will start normally. + startGrid(1); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testInvalidOffHeapConfiguration() throws Exception { + startInvalidGrid(WRONG_OFF_HEAP_GRID_NAME); + } + + /** + * Starts grid that will fail to start due to invalid configuration. + * + * @param name Name of the grid which will have invalid configuration. + */ + private void startInvalidGrid(String name) { + try { + startGrid(name); + + assert false : "Exception should have been thrown."; + } + catch (Exception e) { + info("Caught expected exception: " + e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDaemonNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDaemonNodeAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDaemonNodeAbstractSelfTest.java new file mode 100644 index 0000000..51205e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDaemonNodeAbstractSelfTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Test cache operations with daemon node. + */ +public abstract class GridCacheDaemonNodeAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Daemon flag. */ + protected boolean daemon; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + daemon = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setDaemon(daemon); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setRestEnabled(false); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * Returns cache mode specific for test. + * + * @return Cache configuration. + */ + protected abstract GridCacheMode cacheMode(); + + /** + * @throws Exception If failed. + */ + public void testImplicit() throws Exception { + try { + startGridsMultiThreaded(3); + + daemon = true; + + startGrid(4); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 30; i++) + cache.put(i, i); + + Map<Integer, Integer> batch = new HashMap<>(); + + for (int i = 30; i < 60; i++) + batch.put(i, i); + + cache.putAll(batch); + + for (int i = 0; i < 60; i++) + assertEquals(i, (int)cache.get(i)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testExplicit() throws Exception { + try { + startGridsMultiThreaded(3); + + daemon = true; + + startGrid(4); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 30; i++) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(i, i); + + tx.commit(); + } + } + + Map<Integer, Integer> batch = new HashMap<>(); + + for (int i = 30; i < 60; i++) + batch.put(i, i); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(batch); + tx.commit(); + } + + for (int i = 0; i < 60; i++) + assertEquals(i, (int)cache.get(i)); + } + finally { + stopAllGrids(); + } + } + + /** + * Test mapKeyToNode() method for normal and daemon nodes. + * + * @throws Exception If failed. + */ + public void testMapKeyToNode() throws Exception { + try { + // Start normal nodes. + Ignite g1 = startGridsMultiThreaded(3); + + // Start daemon node. + daemon = true; + + Ignite g2 = startGrid(4); + + for (long i = 0; i < Integer.MAX_VALUE; i = (i << 1) + 1) { + ClusterNode n; + + // Call mapKeyToNode for normal node. + assertNotNull(n = g1.cluster().mapKeyToNode(null, i)); + + // Call mapKeyToNode for daemon node. + if (cacheMode() == PARTITIONED) + assertEquals(n, g2.cluster().mapKeyToNode(null, i)); + else + assertNotNull(g2.cluster().mapKeyToNode(null, i)); + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java new file mode 100644 index 0000000..7a9acf2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests deployment with off-heap storage. + */ +public class GridCacheDeploymentOffHeapSelfTest extends GridCacheDeploymentSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() throws Exception { + CacheConfiguration cacheCfg = super.cacheConfiguration(); + + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setMemoryMode(OFFHEAP_VALUES); + cacheCfg.setOffHeapMaxMemory(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + + return cacheCfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java new file mode 100644 index 0000000..dc244bb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.lang.reflect.*; +import java.util.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.IgniteTxIsolation.REPEATABLE_READ; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Cache + Deployment test. + */ +public class GridCacheDeploymentSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Name for grid without cache. */ + private static final String GRID_NAME = "grid-no-cache"; + + /** First test task name. */ + private static final String TEST_TASK_1 = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestTask1"; + + /** Second test task name. */ + private static final String TEST_TASK_2 = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestTask2"; + + /** Third test task name. */ + private static final String TEST_TASK_3 = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestTask3"; + + /** Test value 1. */ + private static final String TEST_KEY = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestKey"; + + /** Test value 1. */ + private static final String TEST_VALUE_1 = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestValue"; + + /** Test value 2. */ + private static final String TEST_VALUE_2 = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestValue2"; + + /** */ + private IgniteDeploymentMode depMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDeploymentMode(depMode); + + if (GRID_NAME.equals(gridName)) + cfg.setCacheConfiguration(); + else + cfg.setCacheConfiguration(cacheConfiguration()); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setRestEnabled(false); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * @return Cache configuration. + * @throws Exception In case of error. + */ + protected CacheConfiguration cacheConfiguration() throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setPreloadMode(SYNC); + cfg.setStoreValueBytes(true); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setBackups(1); + + return cfg; + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment() throws Exception { + try { + depMode = CONTINUOUS; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + Ignite g0 = startGrid(GRID_NAME); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_1); + + g0.compute().execute(cls, g1.cluster().localNode()); + + cls = ldr.loadClass(TEST_TASK_2); + + g0.compute().execute(cls, g2.cluster().localNode()); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment2() throws Exception { + try { + depMode = CONTINUOUS; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + Ignite g0 = startGrid(GRID_NAME); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_3); + + String key = ""; + + for (int i = 0; i < 1000; i++) { + key = "1" + i; + + if (g1.cluster().mapKeyToNode(null, key).id().equals(g2.cluster().localNode().id())) + break; + } + + g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); + + cls = ldr.loadClass(TEST_TASK_2); + + g0.compute().execute(cls, g2.cluster().localNode()); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment3() throws Exception { + try { + depMode = SHARED; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + Ignite g0 = startGrid(GRID_NAME); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_3); + + String key = ""; + + for (int i = 0; i < 1000; i++) { + key = "1" + i; + + if (g1.cluster().mapKeyToNode(null, key).id().equals(g2.cluster().localNode().id())) + break; + } + + g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); + + stopGrid(GRID_NAME); + + for (int i = 0; i < 10; i++) { + if (g1.cache(null).isEmpty() && g2.cache(null).isEmpty()) + break; + + U.sleep(500); + } + + assert g1.cache(null).isEmpty(); + assert g2.cache(null).isEmpty(); + + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment4() throws Exception { + try { + depMode = CONTINUOUS; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + Ignite g0 = startGrid(GRID_NAME); + + info("Started grids:"); + info("g0: " + g0.cluster().localNode().id()); + info("g1: " + g1.cluster().localNode().id()); + info("g2: " + g2.cluster().localNode().id()); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_3); + + String key = ""; + + for (int i = 0; i < 1000; i++) { + key = "1" + i; + + if (g1.cluster().mapKeyToNode(null, key).id().equals(g2.cluster().localNode().id())) + break; + } + + g0.compute().execute(cls, new T2<>(g1.cluster().localNode(), key)); + + stopGrid(GRID_NAME); + + assert g1.cache(null).size() == 1; + assert g1.cache(null).size() == 1; + + assert g2.cache(null).size() == 1; + assert g2.cache(null).size() == 1; + + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment5() throws Exception { + ClassLoader ldr = getExternalClassLoader(); + + Class val1Cls = ldr.loadClass(TEST_VALUE_1); + Class val2Cls = ldr.loadClass(TEST_VALUE_2); + Class task2Cls = ldr.loadClass(TEST_TASK_2); + + try { + depMode = SHARED; + + Ignite g0 = startGrid(0); + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + info(">>>>>>> Grid 0: " + g0.cluster().localNode().id()); + info(">>>>>>> Grid 1: " + g1.cluster().localNode().id()); + info(">>>>>>> Grid 2: " + g2.cluster().localNode().id()); + + int key = 0; + + key = getNextKey(key, g0, g1.cluster().localNode(), g2.cluster().localNode(), g0.cluster().localNode()); + + info("Key: " + key); + + GridCache<Object, Object> cache = g0.cache(null); + + assert cache != null; + + cache.put(key, Arrays.asList(val1Cls.newInstance())); + + info(">>>>>>> First put completed."); + + key = getNextKey(key + 1, g0, g2.cluster().localNode(), g0.cluster().localNode(), g1.cluster().localNode()); + + info("Key: " + key); + + cache.put(key, val1Cls.newInstance()); + + info(">>>>>>> Second put completed."); + + key = getNextKey(key + 1, g0, g1.cluster().localNode(), g2.cluster().localNode(), g0.cluster().localNode()); + + info("Key: " + key); + + cache.put(key, val2Cls.newInstance()); + + info(">>>>>>> Third put completed."); + + g0.compute().execute(task2Cls, g1.cluster().localNode()); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment6() throws Exception { + try { + depMode = SHARED; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_3); + + String key = ""; + + for (int i = 0; i < 1000; i++) { + key = "1" + i; + + if (g1.cluster().mapKeyToNode(null, key).id().equals(g2.cluster().localNode().id())) + break; + } + + g1.compute().execute(cls, new T2<>(g2.cluster().localNode(), key)); + + stopGrid(1); + + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("unchecked") + public void testDeployment7() throws Exception { + try { + depMode = SHARED; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + ClassLoader ldr = getExternalClassLoader(); + + Class cls = ldr.loadClass(TEST_TASK_3); + + String key = ""; + + for (int i = 0; i < 1000; i++) { + key = "1" + i; + + if (g1.cluster().mapKeyToNode(null, key).id().equals(g2.cluster().localNode().id())) + break; + } + + g2.compute().execute(cls, new T2<>(g2.cluster().localNode(), key)); + + stopGrid(2); + + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testPartitionedDeploymentPreloading() throws Exception { + ClassLoader ldr = getExternalClassLoader(); + + Class valCls = ldr.loadClass(TEST_VALUE_1); + + try { + depMode = SHARED; + + Ignite g = startGrid(0); + + g.cache(null).put(0, valCls.newInstance()); + + info("Added value to cache 0."); + + startGrid(1); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void _testDeploymentGroupLock() throws Exception { + ClassLoader ldr = getExternalClassLoader(); + + Class<?> keyCls = ldr.loadClass(TEST_KEY); + + try { + depMode = SHARED; + + Ignite g1 = startGrid(1); + startGrid(2); + + Constructor<?> constructor = keyCls.getDeclaredConstructor(String.class); + + Object affKey; + + int i = 0; + + do { + affKey = constructor.newInstance(String.valueOf(i)); + + i++; + } + while (!g1.cluster().mapKeyToNode(null, affKey).id().equals(g1.cluster().localNode().id())); + + GridCache<Object, Object> cache = g1.cache(null); + + try (IgniteTx tx = cache.txStartAffinity(affKey, PESSIMISTIC, REPEATABLE_READ, 0, 1)) { + cache.put(new GridCacheAffinityKey<>("key1", affKey), "val1"); + + tx.commit(); + } + + assertEquals("val1", cache.get(new GridCacheAffinityKey<>("key1", affKey))); + } + finally { + stopAllGrids(); + } + } + + /** + * Looks for next key starting from {@code start} for which primary node is {@code primary} and backup is {@code + * backup}. + * + * @param start Key to start search from, inclusive. + * @param g Grid on which check will be performed. + * @param primary Expected primary node. + * @param backup Expected backup node. + * @param near Expected near node. + * @return Key with described properties. + * @throws IllegalStateException if such a key could not be found after 10000 iterations. + */ + private int getNextKey(int start, Ignite g, ClusterNode primary, ClusterNode backup, ClusterNode near) { + info("Primary: " + primary); + info("Backup: " + backup); + info("Near: " + near); + + for (int i = start; i < start + 10000; i++) { + if (g.cache(null).affinity().isPrimary(primary, i) && g.cache(null).affinity().isBackup(backup, i)) { + assert !g.cache(null).affinity().isPrimary(near, i) : "Key: " + i; + assert !g.cache(null).affinity().isBackup(near, i) : "Key: " + i; + + return i; + } + } + + throw new IllegalStateException("Unable to find matching key [start=" + start + ", primary=" + primary.id() + + ", backup=" + backup.id() + ']'); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java new file mode 100644 index 0000000..897d6b5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.internal.processors.cache.GridCacheVersionManager.*; + +/** + * Tests that entry version is + */ +public class GridCacheEntryVersionSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Atomicity mode. */ + private GridCacheAtomicityMode atomicityMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setBackups(1); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testVersionAtomic() throws Exception { + atomicityMode = ATOMIC; + + checkVersion(); + } + + /** + * @throws Exception If failed. + */ + public void testVersionTransactional() throws Exception { + atomicityMode = TRANSACTIONAL; + + checkVersion(); + } + + /** + * @throws Exception If failed. + */ + private void checkVersion() throws Exception { + startGridsMultiThreaded(3); + + try { + Map<Integer,Integer> map = F.asMap(1, 1, 2, 2, 3, 3); + + for (Integer key : map.keySet()) { + info("Affinity nodes [key=" + key + ", nodes=" + + F.viewReadOnly(grid(0).cache(null).affinity().mapKeyToPrimaryAndBackups(key), F.node2id()) + ']'); + } + + grid(0).cache(null).putAll(map); + + for (int g = 0; g < 3; g++) { + GridKernal grid = (GridKernal)grid(g); + + for (Integer key : map.keySet()) { + GridCacheAdapter<Object, Object> cache = grid.internalCache(); + + GridCacheEntryEx<Object, Object> entry = cache.peekEx(key); + + if (entry != null) { + GridCacheVersion ver = entry.version(); + + long order = cache.affinity().mapKeyToNode(key).order(); + + // Check topology version. + assertEquals(3, ver.topologyVersion() - + (grid.context().discovery().gridStartTime() - TOP_VER_BASE_TIME) / 1000); + + // Check node order. + assertEquals("Failed for key: " + key, order, ver.nodeOrder()); + } + } + } + + startGrid(3); + + grid(0).cache(null).putAll(map); + + for (int g = 0; g < 4; g++) { + GridKernal grid = (GridKernal)grid(g); + + for (Integer key : map.keySet()) { + GridCacheAdapter<Object, Object> cache = grid.internalCache(); + + GridCacheEntryEx<Object, Object> entry = cache.peekEx(key); + + if (entry != null) { + GridCacheVersion ver = entry.version(); + + long order = cache.affinity().mapKeyToNode(key).order(); + + // Check topology version. + assertEquals(4, ver.topologyVersion() - + (grid.context().discovery().gridStartTime() - TOP_VER_BASE_TIME) / 1000); + + // Check node order. + assertEquals("Failed for key: " + key, order, ver.nodeOrder()); + } + } + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionEventAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionEventAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionEventAbstractTest.java new file mode 100644 index 0000000..62661a0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionEventAbstractTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Eviction event self test. + */ +public abstract class GridCacheEvictionEventAbstractTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + protected GridCacheEvictionEventAbstractTest() { + super(true); // Start node. + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration c = super.getConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setAtomicityMode(atomicityMode()); + cc.setEvictNearSynchronized(isNearEvictSynchronized()); + + c.setCacheConfiguration(cc); + + c.setIncludeEventTypes(EVT_CACHE_ENTRY_EVICTED, EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + return c; + } + + /** + * @return Cache mode. + */ + protected abstract GridCacheMode cacheMode(); + + /** + * @return Atomicity mode. + */ + protected abstract GridCacheAtomicityMode atomicityMode(); + + /** + * @return {@code True} if near evicts synchronized. + */ + protected boolean isNearEvictSynchronized() { + return false; + } + + /** + * @throws Exception If failed. + */ + public void testEvictionEvent() throws Exception { + Ignite g = grid(); + + final CountDownLatch latch = new CountDownLatch(1); + + final AtomicReference<String> oldVal = new AtomicReference<>(); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + IgniteCacheEvent e = (IgniteCacheEvent) evt; + + oldVal.set((String) e.oldValue()); + + latch.countDown(); + + return true; + } + }, IgniteEventType.EVT_CACHE_ENTRY_EVICTED); + + GridCache<String, String> c = g.cache(null); + + c.put("1", "val1"); + + c.evict("1"); + + latch.await(); + + assertNotNull(oldVal.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheExAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheExAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheExAbstractFullApiSelfTest.java new file mode 100644 index 0000000..395bafb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheExAbstractFullApiSelfTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Abstract test for private cache interface. + */ +public abstract class GridCacheExAbstractFullApiSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** + * @throws Exception If failed. + */ + public void testGetOutTx() throws Exception { + final AtomicInteger lockEvtCnt = new AtomicInteger(); + + IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + lockEvtCnt.incrementAndGet(); + + return true; + } + }; + + try { + grid(0).events().localListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED); + + GridCache<String, Integer> cache = cache(); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + int key = 0; + + for (int i = 0; i < 1000; i++) { + if (cache.affinity().mapKeyToNode("key" + i).id().equals(grid(0).localNode().id())) { + key = i; + + break; + } + } + + cache.get("key" + key); + + for (int i = key + 1; i < 1000; i++) { + if (cache.affinity().mapKeyToNode("key" + i).id().equals(grid(0).localNode().id())) { + key = i; + + break; + } + } + + ((GridCacheProjectionEx<String, Integer>)cache).getAllOutTx(F.asList("key" + key)); + } + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + info("Lock event count: " + lockEvtCnt.get()); + + return lockEvtCnt.get() == (nearEnabled() ? 4 : 2); + } + }, 15000)); + } + finally { + grid(0).events().stopLocalListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFieldsQueryNoDataSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFieldsQueryNoDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFieldsQueryNoDataSelfTest.java new file mode 100644 index 0000000..80e9a74 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFieldsQueryNoDataSelfTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test for local query on partitioned cache without data. + */ +public class GridCacheFieldsQueryNoDataSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + GridCacheQuery<List<?>> qry = grid().cache(null).queries().createSqlFieldsQuery("select _VAL from Integer"); + + Collection<List<?>> res = qry.execute().get(); + + assert res != null; + assert res.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java new file mode 100644 index 0000000..16f0d20 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Abstract class for cache tests. + */ +public class GridCacheFinishPartitionsSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 1; + + /** Grid kernal. */ + private GridKernal grid; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + grid = (GridKernal)grid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid = null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testTxFinishPartitions() throws Exception { + String key = "key"; + String val = "value"; + + GridCache<String, String> cache = grid.cache(null); + + int keyPart = grid.<String, String>internalCache().context().affinity().partition(key); + + cache.put(key, val); + + // Wait for tx-enlisted partition. + long waitTime = runTransactions(key, keyPart, F.asList(keyPart)); + + info("Wait time, ms: " + waitTime); + + // Wait for not enlisted partition. + waitTime = runTransactions(key, keyPart, F.asList(keyPart + 1)); + + info("Wait time, ms: " + waitTime); + + // Wait for both partitions. + waitTime = runTransactions(key, keyPart, F.asList(keyPart, keyPart + 1)); + + info("Wait time, ms: " + waitTime); + } + + /** + * @param key Key. + * @param keyPart Key partition. + * @param waitParts Partitions to wait. + * @return Wait time. + * @throws Exception If failed. + */ + private long runTransactions(final String key, final int keyPart, final Collection<Integer> waitParts) + throws Exception { + int threadNum = 1; + + final CyclicBarrier barrier = new CyclicBarrier(threadNum); + final CountDownLatch latch = new CountDownLatch(threadNum); + + final AtomicLong start = new AtomicLong(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + if (barrier.await() == 0) + start.set(System.currentTimeMillis()); + + GridCache<String, String> cache = grid(0).cache(null); + + IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ); + + cache.get(key); + + IgniteFuture<?> fut = grid.context().cache().context().partitionReleaseFuture(GRID_CNT + 1); + + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> e) { + latch.countDown(); + } + }); + + assert !fut.isDone() : "Failed waiting for locks " + + "[keyPart=" + keyPart + ", waitParts=" + waitParts + ", done=" + fut.isDone() + ']'; + + tx.commit(); + + return null; + } + }, threadNum, "test-finish-partitions-thread"); + + latch.await(); + + return System.currentTimeMillis() - start.get(); + } + + /** + * Tests method {@link GridCacheMvccManager#finishLocks(org.apache.ignite.lang.IgnitePredicate, long)}. + * + * @throws Exception If failed. + */ + public void testMvccFinishPartitions() throws Exception { + String key = "key"; + + int keyPart = grid.internalCache().context().affinity().partition(key); + + // Wait for tx-enlisted partition. + long waitTime = runLock(key, keyPart, F.asList(keyPart)); + + info("Wait time, ms: " + waitTime); + + // Wait for not enlisted partition. + waitTime = runLock(key, keyPart, F.asList(keyPart + 1)); + + info("Wait time, ms: " + waitTime); + + // Wait for both partitions. + waitTime = runLock(key, keyPart, F.asList(keyPart, keyPart + 1)); + + info("Wait time, ms: " + waitTime); + } + + /** + * Tests finish future for particular set of keys. + * + * @throws Exception If failed. + */ + public void testMvccFinishKeys() throws Exception { + GridCache<String, Integer> cache = grid(0).cache(null); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + final String key = "key"; + + cache.get(key); + + GridCacheAdapter<String, Integer> internal = grid.internalCache(); + + IgniteFuture<?> nearFut = internal.context().mvcc().finishKeys(Collections.singletonList(key), 2); + + IgniteFuture<?> dhtFut = internal.context().near().dht().context().mvcc().finishKeys( + Collections.singletonList(key), 2); + + assert !nearFut.isDone(); + assert !dhtFut.isDone(); + + tx.commit(); + } + } + + /** + * Tests chained locks and partitions release future. + * + * @throws Exception If failed. + */ + public void testMvccFinishPartitionsContinuousLockAcquireRelease() throws Exception { + int key = 1; + + GridCacheSharedContext<Object, Object> ctx = grid.context().cache().context(); + + final AtomicLong end = new AtomicLong(0); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteCache<Integer, String> cache = grid.jcache(null); + + cache.lock(key).lock(); + + long start = System.currentTimeMillis(); + + info("Start time: " + start); + + IgniteFuture<?> fut = ctx.partitionReleaseFuture(GRID_CNT + 1); + + assert fut != null; + + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> e) { + end.set(System.currentTimeMillis()); + + latch.countDown(); + + info("End time: " + end.get()); + } + }); + + cache.lock(key + 1).lock(); + + cache.lock(key).unlock(); + + cache.lock(key + 2).lock(); + + cache.lock(key + 1).unlock(); + + assert !fut.isDone() : "Failed waiting for locks"; + + cache.lock(key + 2).unlock(); + + latch.await(); + } + + /** + * @param key Key. + * @param keyPart Key partition. + * @param waitParts Partitions to wait. + * @return Wait time. + * @throws Exception If failed. + */ + private long runLock(String key, int keyPart, Collection<Integer> waitParts) throws Exception { + + GridCacheSharedContext<Object, Object> ctx = grid.context().cache().context(); + + final AtomicLong end = new AtomicLong(0); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteCache<String, String> cache = grid.jcache(null); + + cache.lock(key).lock(); + + long start; + try { + start = System.currentTimeMillis(); + + info("Start time: " + start); + + IgniteFuture<?> fut = ctx.partitionReleaseFuture(GRID_CNT + 1); + + assert fut != null; + + fut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> e) { + end.set(System.currentTimeMillis()); + + latch.countDown(); + + info("End time: " + end.get()); + } + }); + + assert !fut.isDone() : "Failed waiting for locks [keyPart=" + keyPart + ", waitParts=" + waitParts + ", done=" + + fut.isDone() + ']'; + } + finally { + cache.lock(key).unlock(); + } + + latch.await(); + + return end.get() - start; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java new file mode 100644 index 0000000..e3933f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Multithreaded reduce query tests with lots of data. + */ +public class GridCacheFullTextQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int TEST_TIMEOUT = 15 * 60 * 1000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * JUnit. + * + * @throws Exception In case of error. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testH2Text() throws Exception { + int duration = 60 * 1000; + final int keyCnt = 5000; + final int logFreq = 50; + final String txt = "Value"; + + final GridCache<Integer, H2TextValue> c = grid(0).cache(null); + + IgniteFuture<?> fut1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 0; i < keyCnt; i++) { + c.putx(i, new H2TextValue(txt)); + + if (i % logFreq == 0) + X.println("Stored values: " + i); + } + + return null; + } + }, 1); + + // Create query. + final GridCacheQuery<Map.Entry<Integer, H2TextValue>> qry = c.queries().createFullTextQuery( + H2TextValue.class, txt); + + qry.enableDedup(false); + qry.includeBackups(false); + qry.timeout(TEST_TIMEOUT); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteFuture<?> fut2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int cnt = 0; + + while (!stop.get()) { + Collection<Map.Entry<Integer, H2TextValue>> res = qry.execute().get(); + + cnt++; + + if (cnt % logFreq == 0) { + X.println("Result set: " + res.size()); + X.println("Executed queries: " + cnt); + } + } + + return null; + } + }, 1); + + Thread.sleep(duration); + + fut1.get(); + + stop.set(true); + + fut2.get(); + } + + /** + * + */ + private static class H2TextValue { + /** */ + @GridCacheQueryTextField + private final String val; + + /** + * @param val String value. + */ + H2TextValue(String val) { + this.val = val; + } + + /** + * @return String field value. + */ + String value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(H2TextValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java new file mode 100644 index 0000000..37328d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGenericTestStore.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static javax.cache.Cache.*; + +/** + * Test store. + */ +@SuppressWarnings({"TypeParameterExtendsFinalClass"}) +public class GridCacheGenericTestStore<K, V> extends CacheStore<K, V> { + /** Store. */ + private final Map<K, V> map = new ConcurrentHashMap<>(); + + /** Last method called. */ + private String lastMtd; + + /** */ + private long ts = System.currentTimeMillis(); + + /** {@link #write(Entry)} method call counter .*/ + private AtomicInteger putCnt = new AtomicInteger(); + + /** {@link #writeAll(Collection)} method call counter .*/ + private AtomicInteger putAllCnt = new AtomicInteger(); + + /** {@link #delete(Object)} method call counter. */ + private AtomicInteger rmvCnt = new AtomicInteger(); + + /** {@link #deleteAll(Collection)} method call counter. */ + private AtomicInteger rmvAllCnt = new AtomicInteger(); + + /** Flag indicating if methods of this store should fail. */ + private volatile boolean shouldFail; + + /** Configurable delay to simulate slow storage. */ + private int operationDelay; + + /** + * @return Underlying map. + */ + public Map<K, V> getMap() { + return Collections.unmodifiableMap(map); + } + + /** + * Sets a flag indicating if methods of this class should fail with {@link IgniteCheckedException}. + * + * @param shouldFail {@code true} if should fail. + */ + public void setShouldFail(boolean shouldFail) { + this.shouldFail = shouldFail; + } + + /** + * Sets delay that this store should wait on each operation. + * + * @param operationDelay If zero, no delay applied, positive value means + * delay in milliseconds. + */ + public void setOperationDelay(int operationDelay) { + assert operationDelay >= 0; + + this.operationDelay = operationDelay; + } + + /** + * + * @return Last method called. + */ + public String getLastMethod() { + return lastMtd; + } + + /** + * @return Last timestamp. + */ + public long getTimestamp() { + return ts; + } + + /** + * @return Integer timestamp. + */ + public int getStart() { + return Math.abs((int)ts); + } + + /** + * Sets last method to <tt>null</tt>. + */ + public void resetLastMethod() { + lastMtd = null; + } + + /** + * Resets timestamp. + */ + public void resetTimestamp() { + ts = System.currentTimeMillis(); + } + + /** + * Resets the store to initial state. + */ + public void reset() { + lastMtd = null; + + map.clear(); + + putCnt.set(0); + putAllCnt.set(0); + rmvCnt.set(0); + rmvAllCnt.set(0); + + ts = System.currentTimeMillis(); + } + + /** + * @return Count of {@link #write(Entry)} method calls since last reset. + */ + public int getPutCount() { + return putCnt.get(); + } + + /** + * @return Count of {@link #writeAll(Collection)} method calls since last reset. + */ + public int getPutAllCount() { + return putAllCnt.get(); + } + + /** + * @return Number of {@link #delete(Object)} method calls since last reset. + */ + public int getRemoveCount() { + return rmvCnt.get(); + } + + /** + * @return Number of {@link #deleteAll(Collection)} method calls since last reset. + */ + public int getRemoveAllCount() { + return rmvAllCnt.get(); + } + + /** {@inheritDoc} */ + @Override public V load(K key) { + lastMtd = "load"; + + checkOperation(); + + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object[] args) { + lastMtd = "loadAllFull"; + + checkOperation(); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) { + lastMtd = "loadAll"; + + Map<K, V> loaded = new HashMap<>(); + + for (K key : keys) { + V val = map.get(key); + + if (val != null) + loaded.put(key, val); + } + + checkOperation(); + + return loaded; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends V> e) { + lastMtd = "put"; + + checkOperation(); + + map.put(e.getKey(), e.getValue()); + + putCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) { + lastMtd = "putAll"; + + checkOperation(); + + for (Cache.Entry<? extends K, ? extends V> e : entries) + this.map.put(e.getKey(), e.getValue()); + + putAllCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + lastMtd = "remove"; + + checkOperation(); + + map.remove(key); + + rmvCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + lastMtd = "removeAll"; + + checkOperation(); + + for (Object key : keys) + map.remove(key); + + rmvAllCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + // No-op. + } + + /** + * Checks the flag and throws exception if it is set. Checks operation delay and sleeps + * for specified amount of time, if needed. + */ + private void checkOperation() { + if (shouldFail) + throw new IgniteException("Store exception"); + + if (operationDelay > 0) { + try { + U.sleep(operationDelay); + } + catch(IgniteInterruptedException e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java new file mode 100644 index 0000000..a23a22c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.configuration.*; +import javax.cache.processor.*; +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Basic get and transform store test. + */ +public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache store. */ + private static final GridCacheTestStore store = new GridCacheTestStore(); + + /** + * + */ + protected GridCacheGetAndTransformStoreAbstractTest() { + super(true /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store.resetTimestamp(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cache().clearAll(); + + store.reset(); + } + + /** @return Caching mode. */ + protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(atomicityMode()); + cc.setDistributionMode(distributionMode()); + cc.setPreloadMode(SYNC); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @return Distribution mode. + */ + protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } + + /** + * @return Cache atomicity mode. + */ + protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @throws Exception If failed. + */ + public void testGetAndTransform() throws Exception { + final AtomicBoolean finish = new AtomicBoolean(); + + try { + startGrid(0); + startGrid(1); + startGrid(2); + + final Processor entryProcessor = new Processor(); + + IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteCache<Integer, String> c = jcache(ThreadLocalRandom.current().nextInt(3)); + + while (!finish.get() && !Thread.currentThread().isInterrupted()) { + c.get(ThreadLocalRandom.current().nextInt(100)); + + c.put(ThreadLocalRandom.current().nextInt(100), "s"); + + c.invoke( + ThreadLocalRandom.current().nextInt(100), + entryProcessor); + } + + return null; + } + }, + 20); + + Thread.sleep(15_000); + + finish.set(true); + + fut.get(); + } + finally { + stopGrid(0); + stopGrid(1); + stopGrid(2); + + while (!cache().isEmpty()) + cache().globalClearAll(Long.MAX_VALUE); + } + } + + /** + * + */ + private static class Processor implements EntryProcessor<Integer, String, Void>, Serializable { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, String> e, Object... args) { + e.setValue("str"); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGlobalClearAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGlobalClearAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGlobalClearAllSelfTest.java new file mode 100644 index 0000000..a19af15 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGlobalClearAllSelfTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Test {@link org.apache.ignite.cache.GridCache#globalClearAll()} operation in multinode environment with nodes + * having caches with different names. + */ +public class GridCacheGlobalClearAllSelfTest extends GridCommonAbstractTest { + /** Grid nodes count. */ + private static final int GRID_CNT = 3; + + /** Amount of keys stored in the default cache. */ + private static final int KEY_CNT = 20; + + /** Amount of keys stored in cache other than default. */ + private static final int KEY_CNT_OTHER = 10; + + /** Default cache name. */ + private static final String CACHE_NAME = "cache_name"; + + /** Cache name which differs from the default one. */ + private static final String CACHE_NAME_OTHER = "cache_name_other"; + + /** VM IP finder for TCP discovery SPI. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache name which will be passed to grid configuration. */ + private GridCacheMode cacheMode = PARTITIONED; + + /** Cache mode which will be passed to grid configuration. */ + private String cacheName = CACHE_NAME; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(cacheName); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setDistributionMode(NEAR_PARTITIONED); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Start GRID_CNT nodes. All nodes except the last one will have one cache with particular name, while the last + * one will have one cache of the same type, but with different name. + * + * @throws Exception In case of exception. + */ + private void startNodes() throws Exception { + cacheName = CACHE_NAME; + + for (int i = 0; i < GRID_CNT - 1; i++) + startGrid(i); + + cacheName = CACHE_NAME_OTHER; + + startGrid(GRID_CNT - 1); + } + + /** + * Test for partitioned cache. + * + * @throws Exception In case of exception. + */ + public void testGlobalClearAllPartitioned() throws Exception { + cacheMode = PARTITIONED; + + startNodes(); + + performTest(); + } + + /** + * Test for replicated cache. + * + * @throws Exception In case of exception. + */ + public void testGlobalClearAllReplicated() throws Exception { + cacheMode = REPLICATED; + + startNodes(); + + performTest(); + } + + /** + * Ensure that globalClearAll() clears correct cache and is only executed on nodes with the cache excluding + * master-node where it is executed locally. + * + * @throws Exception If failed. + */ + public void performTest() throws Exception { + // Put values into normal replicated cache. + for (int i = 0; i < KEY_CNT; i++) + grid(0).cache(CACHE_NAME).put(i, "val" + i); + + // Put values into a cache with another name. + for (int i = 0; i < KEY_CNT_OTHER; i++) + grid(GRID_CNT - 1).cache(CACHE_NAME_OTHER).put(i, "val" + i); + + // Check cache sizes. + for (int i = 0; i < GRID_CNT - 1; i++) { + GridCache<Object, Object> cache = grid(i).cache(CACHE_NAME); + + assertEquals("Key set [i=" + i + ", keys=" + cache.keySet() + ']', KEY_CNT, cache.size()); + } + + assert grid(GRID_CNT - 1).cache(CACHE_NAME_OTHER).size() == KEY_CNT_OTHER; + + // Perform clear. + grid(0).cache(CACHE_NAME).globalClearAll(); + + // Expect caches with the given name to be clear on all nodes. + for (int i = 0; i < GRID_CNT - 1; i++) + assert grid(i).cache(CACHE_NAME).isEmpty(); + + // ... but cache with another name should remain untouched. + assert grid(GRID_CNT - 1).cache(CACHE_NAME_OTHER).size() == KEY_CNT_OTHER; + } +}