http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java new file mode 100644 index 0000000..8763dd1 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@ -0,0 +1,748 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.processors.query.h2.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Multi-threaded tests for cache queries. + */ +@SuppressWarnings("StatementWithEmptyBody") +public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final boolean TEST_INFO = true; + + /** Number of test grids (nodes). Should not be less than 2. */ + private static final int GRID_CNT = 2; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static AtomicInteger idxSwapCnt = new AtomicInteger(); + + /** */ + private static AtomicInteger idxUnswapCnt = new AtomicInteger(); + + /** */ + private static final long DURATION = 30 * 1000; + + /** Don't start grid by default. */ + public IgniteCacheQueryMultiThreadedSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(true); + cacheCfg.setBackups(1); + cacheCfg.setEvictionPolicy(evictsEnabled() ? new CacheLruEvictionPolicy(100) : null); + + CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cacheCfg.setQueryConfiguration(qcfg); + + if (offheapEnabled() && evictsEnabled()) + cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions. + + cfg.setCacheConfiguration(cacheCfg); + + GridQueryConfiguration indexing = new GridQueryConfiguration(); + + indexing.setMaxOffheapRowsCacheSize(128); + + if (offheapEnabled()) + indexing.setMaxOffHeapMemory(0); + + cfg.setQueryConfiguration(indexing); + + GridQueryProcessor.idxCls = FakeIndexing.class; + + return cfg; + } + + /** + * + */ + private static class FakeIndexing extends IgniteH2Indexing { + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException { + super.onSwap(spaceName, key); + + idxSwapCnt.incrementAndGet(); + } + + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + throws IgniteCheckedException { + super.onUnswap(spaceName, key, val, valBytes); + + idxUnswapCnt.incrementAndGet(); + } + } + + /** @return {@code true} If offheap enabled. */ + protected boolean offheapEnabled() { + return false; + } + + /** @return {@code true} If evictions enabled. */ + protected boolean evictsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2."; + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + if (evictsEnabled()) { + assertTrue(idxSwapCnt.get() > 0); + assertTrue(idxUnswapCnt.get() > 0); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + c.removeAll(F.<CacheEntry<Object, Object>>alwaysTrue()); + + Iterator<Map.Entry<Object, Object>> it = c.swapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + it = c.offHeapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys()); + assertEquals(0, c.offHeapEntriesCount()); + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void info(String msg) { + if (TEST_INFO) + super.info(msg); + } + + /** + * @param entries Entries. + * @param g Grid. + * @return Affinity nodes. + */ + private Set<UUID> affinityNodes(Iterable<Cache.Entry<Integer, Integer>> entries, Ignite g) { + Set<UUID> nodes = new HashSet<>(); + + for (Cache.Entry<Integer, Integer> entry : entries) + nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id()); + + return nodes; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, String> c = g.jcache(null); + final IgniteCache<Integer, Long> cl = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size()); + assertEquals(0, cl.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + switch (rnd.nextInt(5)) { + case 0: + c.put(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(rnd.nextInt(keyCnt))); + + break; + case 2: + c.remove(rnd.nextInt(keyCnt)); + + break; + case 3: + c.get(rnd.nextInt(keyCnt)); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + QueryCursor<Cache.Entry<Integer, String>> qry = c.query( + new QuerySqlPredicate<Integer, String>("_val between ? and ?", String.valueOf(from), + String.valueOf(from + 250))); + + Collection<Cache.Entry<Integer, String>> res = qry.getAll(); + + for (Cache.Entry<Integer, String> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLong() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Long> c = g.jcache(null); + final IgniteCache<Integer, String> c1 = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c1.query(new QuerySqlPredicate<Integer, String>("1 = 1")).getAll().size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, Long>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, (long)rnd.nextInt(valCnt)); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.put(key, (long)rnd.nextInt(valCnt)); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, Long>> res = c.query(new QuerySqlPredicate<Integer, Long>( + "_val between ? and ?", from, from + 250)).getAll(); + + for (Cache.Entry<Integer, Long> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLongString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Object> c = g.jcache(null); + + assertEquals(0, g.jcache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, Object>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.put(key, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : + String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, Object>> res = c.query( + new QuerySqlPredicate<Integer, Object>("_val between ? and ?", from, from + 250)) + .getAll(); + + for (Cache.Entry<Integer, Object> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapObject() throws Exception { + int threadCnt = 50; + final int keyCnt = 4000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, TestValue> c = g.jcache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.query(new QuerySqlPredicate<Integer, TestValue>("1 = 1")).getAll().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.put(i, new TestValue(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + c.localEvict(Arrays.asList(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.put(key, new TestValue(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.localEvict(Arrays.asList(key)); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + int from = rnd.nextInt(valCnt); + + Collection<Cache.Entry<Integer, TestValue>> res = + c.query(new QuerySqlPredicate<Integer, TestValue>("TestValue.val between ? and ?", + from, from + 250)).getAll(); + + for (Cache.Entry<Integer, TestValue> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSameQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) { + c.put(i, i); + + c.localEvict(Arrays.asList(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assert entries != null; + + assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) + + ", iteration=" + iter + ']', keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + info("Finishing test..."); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedNewQueries() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) { + c.put(i, i); + + c.localEvict(Arrays.asList(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedScanQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 500; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache<Integer, Integer> c = g.jcache(null); + + for (int i = 0; i < keyCnt; i++) + c.put(i, i); + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + // Scan query. + Collection<Cache.Entry<Integer, Integer>> entries = + c.query(new QueryPredicate<Integer, Integer>() { + @Override public boolean apply(Cache.Entry<Integer, Integer> integerIntegerEntry) { + return true; + } + }).getAll(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * Test value. + */ + private static class TestValue implements Serializable { + /** Value. */ + @CacheQuerySqlField + private int val; + + /** + * @param val Value. + */ + private TestValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java new file mode 100644 index 0000000..dc25af5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Multi-threaded tests for cache queries. + */ +public class IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest extends IgniteCacheQueryOffheapMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean evictsEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java new file mode 100644 index 0000000..cb4beaf --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapMultiThreadedSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Queries over off-heap indexes. + */ +public class IgniteCacheQueryOffheapMultiThreadedSelfTest extends IgniteCacheQueryMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean offheapEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java new file mode 100644 index 0000000..eacc4f3 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setDistributionMode(PARTITIONED_ONLY); + ccfg.setQueryIndexEnabled(true); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + + c.setCacheConfiguration(ccfg); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + final IgniteCache<Integer, Person> cache = grid(0).jcache(null); + + for (int i = 0; i < 2000; i++) + cache.put(i, new Person(i)); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < 100; i++) { + Iterator<Cache.Entry<Integer, Person>> iter = + cache.query(new QuerySqlPredicate<Integer, Person>("Person", "age >= 0", 50, new Object[0])) + .iterator(); + + int cnt = 0; + + while (iter.next() != null) + cnt++; + + assertEquals(2000, cnt); + } + + return null; + } + }, 16, "test"); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + @CacheQuerySqlField + private int age; + + /** + * @param age Age. + */ + Person(int age) { + this.age = age; + } + + /** + * @return Age/ + */ + public int age() { + return age; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java deleted file mode 100644 index be19ec2..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledQuerySelfTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.cache.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; - -/** - * Tests for atomic cache with near cache enabled. - */ -public class GridCacheAtomicNearEnabledQuerySelfTest extends GridCachePartitionedQuerySelfTest { - /** {@inheritDoc} */ - @Override protected CacheAtomicityMode atomicityMode() { - return ATOMIC; - } - - /** {@inheritDoc} */ - @Override protected CacheDistributionMode distributionMode() { - return NEAR_PARTITIONED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java deleted file mode 100644 index ad0a7db..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicQuerySelfTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.cache.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; - -/** - * Tests for partitioned cache queries. - */ -public class GridCacheAtomicQuerySelfTest extends GridCachePartitionedQuerySelfTest { - /** {@inheritDoc} */ - @Override protected CacheAtomicityMode atomicityMode() { - return ATOMIC; - } - - /** {@inheritDoc} */ - @Override protected CacheDistributionMode distributionMode() { - return PARTITIONED_ONLY; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java deleted file mode 100644 index d19c4ba..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQueryP2PDisabledSelfTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.configuration.*; - -/** - * Tests for partitioned cache queries. - */ -public class GridCachePartitionedQueryP2PDisabledSelfTest extends GridCachePartitionedQuerySelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setPeerClassLoadingEnabled(false); - - return c; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java deleted file mode 100644 index 01debfc..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedQuerySelfTest.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.cache.CacheMode.*; - -/** - * Tests for partitioned cache queries. - */ -public class GridCachePartitionedQuerySelfTest extends GridCacheAbstractQuerySelfTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return PARTITIONED; - } - - /** - * JUnit. - * - * @throws Exception If failed. - */ - public void testSingleNodeQuery() throws Exception { - Person p1 = new Person("Jon", 1500); - Person p2 = new Person("Jane", 2000); - Person p3 = new Person("Mike", 1800); - Person p4 = new Person("Bob", 1900); - - GridCache<UUID, Person> cache0 = grid(0).cache(null); - - cache0.put(p1.id(), p1); - cache0.put(p2.id(), p2); - cache0.put(p3.id(), p3); - cache0.put(p4.id(), p4); - - assertEquals(4, cache0.size()); - - CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class, - "salary < 2000").projection(grid(0).forLocal()); - - // Include backup entries. - qry.includeBackups(true); - - // In order to get accumulated result from all queried nodes. - qry.keepAll(true); - - Collection<Map.Entry<UUID, Person>> entries = qry.execute().get(); - - assert entries != null; - - info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value())); - - assertEquals(3, entries.size()); - - checkResult(entries, p1, p3, p4); - } - - /** - * @throws Exception If failed. - */ - public void testFieldsQuery() throws Exception { - Person p1 = new Person("Jon", 1500); - Person p2 = new Person("Jane", 2000); - Person p3 = new Person("Mike", 1800); - Person p4 = new Person("Bob", 1900); - - Ignite ignite0 = grid(0); - - GridCache<UUID, Person> cache0 = ignite0.cache(null); - - cache0.put(p1.id(), p1); - cache0.put(p2.id(), p2); - cache0.put(p3.id(), p3); - cache0.put(p4.id(), p4); - - assertEquals(4, cache0.size()); - - // Fields query - CacheQuery<List<?>> qry = cache0.queries().createSqlFieldsQuery("select name from Person where salary > ?"). - projection(ignite0.cluster()); - - Collection<List<?>> res = qry.execute(1600).get(); - - assertEquals(3, res.size()); - - // Fields query count(*) - qry = cache0.queries().createSqlFieldsQuery("select count(*) from Person").projection(ignite0.cluster()); - - res = qry.execute().get(); - - int cnt = 0; - - for (List<?> row : res) - cnt += (Long)row.get(0); - - assertEquals(4, cnt); - } - - /** - * @throws Exception If failed. - */ - public void testMultipleNodesQuery() throws Exception { - Person p1 = new Person("Jon", 1500); - Person p2 = new Person("Jane", 2000); - Person p3 = new Person("Mike", 1800); - Person p4 = new Person("Bob", 1900); - - GridCache<UUID, Person> cache0 = grid(0).cache(null); - - cache0.put(p1.id(), p1); - cache0.put(p2.id(), p2); - cache0.put(p3.id(), p3); - cache0.put(p4.id(), p4); - - assertEquals(4, cache0.size()); - - assert grid(0).nodes().size() == gridCount(); - - CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class, - "salary < 2000"); - - // Include backup entries and disable de-duplication. - qry.includeBackups(true); - qry.enableDedup(false); - - // In order to get accumulated result from all queried nodes. - qry.keepAll(true); - - // Execute on full projection, duplicates are expected. - Collection<Map.Entry<UUID, Person>> entries = qry.execute().get(); - - assert entries != null; - - info("Queried entries: " + entries); - - info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value())); - - // Expect result including backup persons. - assertEquals(3 * gridCount(), entries.size()); - - checkResult(entries, p1, p3, p4); - - // Now do the same filtering but using projection. - qry = cache0.projection(F.<UUID, Person>cachePrimary()).queries().createSqlQuery(Person.class, - "salary < 2000"); - - qry.keepAll(true); - - entries = qry.execute().get(); - - assert entries != null; - - info("Queried persons: " + F.viewReadOnly(entries, F.<Person>mapEntry2Value())); - - // Expect result including backup persons. - assertEquals(3, entries.size()); - - checkResult(entries, p1, p3, p4); - } - - /** - * @throws Exception If failed. - */ - public void testIncludeBackupsAndEnableDedup() throws Exception { - Person p1 = new Person("Jon", 1500); - Person p2 = new Person("Jane", 2000); - Person p3 = new Person("Mike", 1800); - Person p4 = new Person("Bob", 1900); - - GridCache<UUID, Person> cache0 = grid(0).cache(null); - - cache0.put(p1.id(), p1); - cache0.put(p2.id(), p2); - cache0.put(p3.id(), p3); - cache0.put(p4.id(), p4); - - // Retry several times. - for (int i = 0; i < 10; i++) { - CacheQuery<Map.Entry<UUID, Person>> qry = cache0.queries().createSqlQuery(Person.class, - "salary < 2000"); - - // Include backup entries and disable de-duplication. - qry.includeBackups(true); - qry.enableDedup(false); - - Collection<Map.Entry<UUID, Person>> entries = qry.execute().get(); - - info("Entries: " + entries); - - assertEquals(gridCount() * 3, entries.size()); - - // Recreate query since we cannot use the old one. - qry = cache0.queries().createSqlQuery(Person.class, "salary < 2000"); - - // Exclude backup entries and enable de-duplication. - qry.includeBackups(false); - qry.enableDedup(true); - - entries = qry.execute().get(); - - assertEquals(3, entries.size()); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("FloatingPointEquality") - public void testScanReduceQuery() throws Exception { - GridCache<UUID, Person> c = ignite.cache(null); - - Person p1 = new Person("Bob White", 1000); - Person p2 = new Person("Tom White", 2000); - Person p3 = new Person("Mike Green", 20000); - - c.put(p1.id(), p1); - c.put(p2.id(), p2); - c.put(p3.id(), p3); - - CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createScanQuery(new P2<UUID, Person>() { - @Override public boolean apply(UUID k, Person p) { - return p.salary() < 20000; - } - }); - - R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() { - private double sum; - - private int cnt; - - @Override public boolean collect(IgnitePair<Integer> p) { - sum += p.get1(); - cnt += p.get2(); - - return true; - } - - @Override public Double reduce() { - return sum / cnt; - } - }; - - Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() { - private int sum; - - private int cnt; - - @Override public boolean collect(Map.Entry<UUID, Person> e) { - sum += e.getValue().salary(); - cnt++; - - return true; - } - - @Override public IgnitePair<Integer> reduce() { - return new IgnitePair<>(sum, cnt); - } - }).get(); - - assertEquals(1500., F.reduce(res, locRdc)); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("FloatingPointEquality") - public void testSqlReduceQuery() throws Exception { - GridCache<UUID, Person> c = ignite.cache(null); - - Person p1 = new Person("Bob White", 1000); - Person p2 = new Person("Tom White", 2000); - Person p3 = new Person("Mike Green", 20000); - - c.put(p1.id(), p1); - c.put(p2.id(), p2); - c.put(p3.id(), p3); - - CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createSqlQuery(Person.class, "salary < 20000"); - - R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() { - private double sum; - - private int cnt; - - @Override public boolean collect(IgnitePair<Integer> p) { - sum += p.get1(); - cnt += p.get2(); - - return true; - } - - @Override public Double reduce() { - return sum / cnt; - } - }; - - Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() { - private int sum; - - private int cnt; - - @Override public boolean collect(Map.Entry<UUID, Person> e) { - sum += e.getValue().salary(); - cnt++; - - return true; - } - - @Override public IgnitePair<Integer> reduce() { - return new IgnitePair<>(sum, cnt); - } - }).get(); - - assert F.reduce(res, locRdc) == 1500; - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("FloatingPointEquality") - public void testLuceneReduceQuery() throws Exception { - GridCache<UUID, Person> c = ignite.cache(null); - - Person p1 = new Person("Bob White", 1000); - Person p2 = new Person("Tom White", 2000); - Person p3 = new Person("Mike Green", 20000); - - c.put(p1.id(), p1); - c.put(p2.id(), p2); - c.put(p3.id(), p3); - - CacheQuery<Map.Entry<UUID, Person>> q = c.queries().createFullTextQuery(Person.class, "White"); - - R1<IgnitePair<Integer>, Double> locRdc = new R1<IgnitePair<Integer>, Double>() { - private double sum; - - private int cnt; - - @Override public boolean collect(IgnitePair<Integer> p) { - sum += p.get1(); - cnt += p.get2(); - - return true; - } - - @Override public Double reduce() { - return sum / cnt; - } - }; - - Collection<IgnitePair<Integer>> res = q.execute(new R1<Map.Entry<UUID, Person>, IgnitePair<Integer>>() { - private int sum; - - private int cnt; - - @Override public boolean collect(Map.Entry<UUID, Person> e) { - sum += e.getValue().salary(); - cnt++; - - return true; - } - - @Override public IgnitePair<Integer> reduce() { - return new IgnitePair<>(sum, cnt); - } - }).get(); - - assert F.reduce(res, locRdc) == 1500; - } - - /** - * @throws Exception If failed. - */ - public void testPaginationGet0() throws Exception { - int key = 0; - - for (int i = 0; i < gridCount(); i++) { - int cnt = 0; - - while (true) { - if (grid(i).cache(null).affinity().mapKeyToNode(key).equals(grid(i).localNode())) { - assertTrue(grid(i).cache(null).putx(key, key)); - - cnt++; - } - - key++; - - if (cnt == (i == 1 ? 2 : 3)) - break; - } - } - - for (int i = 0; i < gridCount(); i++) - assertEquals(i == 1 ? 2 : 3, grid(i).cache(null).primarySize()); - - GridCache<Integer, Integer> cache = ignite.cache(null); - - CacheQuery<Map.Entry<Integer, Integer>> q = cache.queries().createSqlQuery(Integer.class, "_key >= 0"); - - q.pageSize(2); - q.includeBackups(false); - q.enableDedup(true); - - Collection<Map.Entry<Integer, Integer>> res = q.execute().get(); - - assertEquals(gridCount() * 3 - 1, res.size()); - } - - /** - * @throws Exception If failed. - */ - public void testReduceWithPagination() throws Exception { - GridCache<Integer, Integer> c = grid(0).cache(null); - - for (int i = 0; i < 50; i++) - assertTrue(c.putx(i, 10)); - - CacheQuery<Map.Entry<Integer, Integer>> q = c.queries().createSqlQuery(Integer.class, "_key >= 0"); - - q.pageSize(10); - - int res = F.sumInt(q.execute(new R1<Map.Entry<Integer, Integer>, Integer>() { - private int sum; - - @Override public boolean collect(@Nullable Map.Entry<Integer, Integer> e) { - sum += e.getValue(); - - return true; - } - - @Override public Integer reduce() { - return sum; - } - }).get()); - - assertEquals(500, res); - } - - /** - * @param entries Queried result. - * @param persons Persons that should be in the result. - */ - private void checkResult(Iterable<Map.Entry<UUID, Person>> entries, Person... persons) { - for (Map.Entry<UUID, Person> entry : entries) { - assertEquals(entry.getKey(), entry.getValue().id()); - - assert F.<Person>asList(persons).contains(entry.getValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java deleted file mode 100644 index 33a5001..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheQueryNodeRestartSelfTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; - -/** - * Test for distributed queries with node restarts. - */ -public class GridCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest { - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final int KEY_CNT = 1000; - - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 90 * 1000; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(PARTITIONED); - cc.setBackups(1); - cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); - - CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); - - qcfg.setIndexPrimitiveKey(true); - - cc.setQueryConfiguration(qcfg); - - c.setCacheConfiguration(cc); - - return c; - } - - /** - * JUnit. - * - * @throws Exception If failed. - */ - @SuppressWarnings({"TooBroadScope"}) - public void testRestarts() throws Exception { - int duration = 60 * 1000; - int qryThreadNum = 10; - final long nodeLifeTime = 2 * 1000; - final int logFreq = 20; - - final GridCache<Integer, Integer> cache = grid(0).cache(null); - - assert cache != null; - - for (int i = 0; i < KEY_CNT; i++) - cache.put(i, i); - - assertEquals(KEY_CNT, cache.size()); - - final AtomicInteger qryCnt = new AtomicInteger(); - - final AtomicBoolean done = new AtomicBoolean(); - - IgniteFuture<?> fut1 = multithreadedAsync(new CAX() { - @Override public void applyx() throws IgniteCheckedException { - while (!done.get()) { - CacheQuery<Map.Entry<Integer, Integer>> qry = - cache.queries().createSqlQuery(Integer.class, "_val >= 0"); - - qry.includeBackups(true); - qry.keepAll(true); - - assertFalse(qry.execute().get().isEmpty()); - - int c = qryCnt.incrementAndGet(); - - if (c % logFreq == 0) - info("Executed queries: " + c); - } - } - }, qryThreadNum); - - final AtomicInteger restartCnt = new AtomicInteger(); - - CollectingEventListener lsnr = new CollectingEventListener(); - - for (int i = 0; i < GRID_CNT; i++) - grid(i).events().localListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); - - IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { - @SuppressWarnings({"BusyWait"}) - @Override public Object call() throws Exception { - while (!done.get()) { - int idx = GRID_CNT; - - startGrid(idx); - - Thread.sleep(nodeLifeTime); - - stopGrid(idx); - - int c = restartCnt.incrementAndGet(); - - if (c % logFreq == 0) - info("Node restarts: " + c); - } - - return true; - } - }, 1); - - Thread.sleep(duration); - - done.set(true); - - fut1.get(); - fut2.get(); - - info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']'); - - boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000); - - for (int i = 0; i < GRID_CNT; i++) - grid(i).events().stopLocalListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); - - assert success; - } - - /** Listener that will wait for specified number of events received. */ - private class CollectingEventListener implements IgnitePredicate<IgniteEvent> { - /** Registered events count. */ - private int evtCnt; - - /** {@inheritDoc} */ - @Override public synchronized boolean apply(IgniteEvent evt) { - evtCnt++; - - info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']'); - - notifyAll(); - - return true; - } - - /** - * Waits until total number of events processed is equal or greater then argument passed. - * - * @param cnt Number of events to wait. - * @param timeout Timeout to wait. - * @return {@code True} if successfully waited, {@code false} if timeout happened. - * @throws InterruptedException If thread is interrupted. - */ - public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException { - long start = U.currentTimeMillis(); - - long now = start; - - while (start + timeout > now) { - if (evtCnt >= cnt) - return true; - - wait(start + timeout - now); - - now = U.currentTimeMillis(); - } - - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java new file mode 100644 index 0000000..f0d8f77 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledQuerySelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; + +/** + * Tests for atomic cache with near cache enabled. + */ +public class IgniteCacheAtomicNearEnabledQuerySelfTest extends IgniteCachePartitionedQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java new file mode 100644 index 0000000..8ff44f5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicQuerySelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; + +/** + * Tests for partitioned cache queries. + */ +public class IgniteCacheAtomicQuerySelfTest extends IgniteCachePartitionedQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java new file mode 100644 index 0000000..4779a98 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryP2PDisabledSelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.*; + +/** + * Tests for partitioned cache queries. + */ +public class IgniteCachePartitionedQueryP2PDisabledSelfTest extends IgniteCachePartitionedQuerySelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setPeerClassLoadingEnabled(false); + + return c; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java new file mode 100644 index 0000000..fa62019 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; + +import javax.cache.Cache; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests for partitioned cache queries. + */ +public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuerySelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testSingleNodeQuery() throws Exception { + Person p1 = new Person("Jon", 1500); + Person p2 = new Person("Jane", 2000); + Person p3 = new Person("Mike", 1800); + Person p4 = new Person("Bob", 1900); + + IgniteCache<UUID, Person> cache0 = grid(0).jcache(null); + + cache0.put(p1.id(), p1); + cache0.put(p2.id(), p2); + cache0.put(p3.id(), p3); + cache0.put(p4.id(), p4); + + assertEquals(4, cache0.size()); + + QueryCursor<Cache.Entry<UUID, Person>> qry = + cache0.localQuery(new QuerySqlPredicate<UUID, Person>("salary < 2000")); + + Collection<Cache.Entry<UUID, Person>> entries = qry.getAll(); + + assert entries != null; + + assertEquals(3, entries.size()); + + checkResult(entries, p1, p3, p4); + } + + /** + * @throws Exception If failed. + */ + public void testFieldsQuery() throws Exception { + Person p1 = new Person("Jon", 1500); + Person p2 = new Person("Jane", 2000); + Person p3 = new Person("Mike", 1800); + Person p4 = new Person("Bob", 1900); + + Ignite ignite0 = grid(0); + + IgniteCache<UUID, Person> cache0 = ignite0.jcache(null); + + cache0.put(p1.id(), p1); + cache0.put(p2.id(), p2); + cache0.put(p3.id(), p3); + cache0.put(p4.id(), p4); + + assertEquals(4, cache0.size()); + + // Fields query + QueryCursor<List<?>> qry = cache0 + .queryFields(new QuerySqlPredicate<UUID, Person>("select name from Person where salary > ?", 1600)); + + Collection<List<?>> res = qry.getAll(); + + assertEquals(3, res.size()); + + // Fields query count(*) + qry = cache0.queryFields(new QuerySqlPredicate<UUID, Person>("select count(*) from Person")); + + res = qry.getAll(); + + int cnt = 0; + + for (List<?> row : res) + cnt += (Long)row.get(0); + + assertEquals(4, cnt); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesQuery() throws Exception { + Person p1 = new Person("Jon", 1500); + Person p2 = new Person("Jane", 2000); + Person p3 = new Person("Mike", 1800); + Person p4 = new Person("Bob", 1900); + + IgniteCache<UUID, Person> cache0 = grid(0).jcache(null); + + cache0.put(p1.id(), p1); + cache0.put(p2.id(), p2); + cache0.put(p3.id(), p3); + cache0.put(p4.id(), p4); + + assertEquals(4, cache0.size()); + + assert grid(0).nodes().size() == gridCount(); + + QueryCursor<Cache.Entry<UUID, Person>> qry = + cache0.query(new QuerySqlPredicate<UUID, Person>("salary < 2000")); + + // Execute on full projection, duplicates are expected. + Collection<Cache.Entry<UUID, Person>> entries = qry.getAll(); + + assert entries != null; + + info("Queried entries: " + entries); + + // Expect result including backup persons. + assertEquals(3 * gridCount(), entries.size()); + + checkResult(entries, p1, p3, p4); + } + + /** + * @param entries Queried result. + * @param persons Persons that should be in the result. + */ + private void checkResult(Iterable<Cache.Entry<UUID, Person>> entries, Person... persons) { + for (Cache.Entry<UUID, Person> entry : entries) { + assertEquals(entry.getKey(), entry.getValue().id()); + + assert F.<Person>asList(persons).contains(entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java new file mode 100644 index 0000000..a8da960 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import javax.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for distributed queries with node restarts. + */ +public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 90 * 1000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cc.setQueryConfiguration(qcfg); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testRestarts() throws Exception { + int duration = 60 * 1000; + int qryThreadNum = 10; + final long nodeLifeTime = 2 * 1000; + final int logFreq = 20; + + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + assert cache != null; + + for (int i = 0; i < KEY_CNT; i++) + cache.put(i, i); + + assertEquals(KEY_CNT, cache.size()); + + final AtomicInteger qryCnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + Collection<Cache.Entry<Integer, Integer>> res = + cache.query(new QuerySqlPredicate<Integer, Integer>("_val >= 0")).getAll(); + + assertFalse(res.isEmpty()); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + CollectingEventListener lsnr = new CollectingEventListener(); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().localListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + while (!done.get()) { + int idx = GRID_CNT; + + startGrid(idx); + + Thread.sleep(nodeLifeTime); + + stopGrid(idx); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, 1); + + Thread.sleep(duration); + + done.set(true); + + fut1.get(); + fut2.get(); + + info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']'); + + boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().stopLocalListen(lsnr, IgniteEventType.EVT_CACHE_PRELOAD_STOPPED); + + assert success; + } + + /** Listener that will wait for specified number of events received. */ + private class CollectingEventListener implements IgnitePredicate<IgniteEvent> { + /** Registered events count. */ + private int evtCnt; + + /** {@inheritDoc} */ + @Override public synchronized boolean apply(IgniteEvent evt) { + evtCnt++; + + info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']'); + + notifyAll(); + + return true; + } + + /** + * Waits until total number of events processed is equal or greater then argument passed. + * + * @param cnt Number of events to wait. + * @param timeout Timeout to wait. + * @return {@code True} if successfully waited, {@code false} if timeout happened. + * @throws InterruptedException If thread is interrupted. + */ + public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException { + long start = U.currentTimeMillis(); + + long now = start; + + while (start + timeout > now) { + if (evtCnt >= cnt) + return true; + + wait(start + timeout - now); + + now = U.currentTimeMillis(); + } + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df488d08/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java deleted file mode 100644 index 9b59faa..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedQueryP2PDisabledSelfTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.replicated; - -import org.apache.ignite.configuration.*; - -/** - * Tests replicated query. - */ -public class GridCacheReplicatedQueryP2PDisabledSelfTest extends GridCacheReplicatedQuerySelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setPeerClassLoadingEnabled(false); - - return c; - } -}