http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java new file mode 100644 index 0000000..db421c0 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryLoadSelfTest.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test that entries are indexed on load/reload methods. + */ +public class GridCacheQueryLoadSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Puts count. */ + private static final int PUT_CNT = 10; + + /** Store map. */ + private static final Map<Integer, ValueObject> STORE_MAP = new HashMap<>(); + + /** */ + public GridCacheQueryLoadSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(REPLICATED); + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cache().removeAll(); + + assert cache().isEmpty(); + assert size(ValueObject.class) == 0; + + STORE_MAP.clear(); + } + + /** + * Number of objects of given type in index. + * + * @param cls Value type. + * @return Objects number. + * @throws IgniteCheckedException If failed. + */ + private long size(Class<?> cls) throws IgniteCheckedException { + GridCacheQueryManager<Object, Object> qryMgr = ((GridKernal)grid()).internalCache().context().queries(); + + assert qryMgr != null; + + return qryMgr.size(cls); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + cache.loadCache(null, 0); + + assert cache.size() == PUT_CNT; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assertNotNull(res); + assertEquals(PUT_CNT, res.size()); + assertEquals(PUT_CNT, size(ValueObject.class)); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsync() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + cache.loadCacheAsync(null, 0).get(); + + assert cache.size() == PUT_CNT; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT; + assert size(ValueObject.class) == PUT_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheFiltered() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + cache.loadCache(new P2<Integer, ValueObject>() { + @Override public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } + }, 0); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsyncFiltered() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + cache.loadCacheAsync(new P2<Integer, ValueObject>() { + @Override public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } + }, 0).get(); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReload() throws Exception { + STORE_MAP.put(1, new ValueObject(1)); + + GridCache<Integer, ValueObject> cache = cache(); + + ValueObject vo = cache.reload(1); + + assertNotNull(vo); + + assertEquals(1, vo.value()); + assertEquals(1, cache.size()); + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == 1; + assert size(ValueObject.class) == 1; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAsync() throws Exception { + STORE_MAP.put(1, new ValueObject(1)); + + GridCache<Integer, ValueObject> cache = cache(); + + assert cache.reloadAsync(1).get().value() == 1; + + assert cache.size() == 1; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == 1; + assert size(ValueObject.class) == 1; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAll() throws Exception { + for (int i = 0; i < PUT_CNT; i++) + STORE_MAP.put(i, new ValueObject(i)); + + GridCache<Integer, ValueObject> cache = cache(); + + Integer[] keys = new Integer[PUT_CNT - 5]; + + for (int i = 0; i < PUT_CNT - 5; i++) + keys[i] = i + 5; + + cache.reloadAll(F.asList(keys)); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + + for (Integer key : keys) + cache.clear(key); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + + cache.reloadAll(Arrays.asList(keys)); + + assertEquals(PUT_CNT - 5, cache.size()); + + res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAllAsync() throws Exception { + for (int i = 0; i < PUT_CNT; i++) + STORE_MAP.put(i, new ValueObject(i)); + + GridCache<Integer, ValueObject> cache = cache(); + + Integer[] keys = new Integer[PUT_CNT - 5]; + + for (int i = 0; i < PUT_CNT - 5; i++) + keys[i] = i + 5; + + cache.reloadAllAsync(F.asList(keys)).get(); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + + // Invalidate will remove entries. + for (Integer key : keys) + cache.clear(key); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + + cache.reloadAllAsync(Arrays.asList(keys)).get(); + + assertEquals(PUT_CNT - 5, cache.size()); + + res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAllFiltered() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + for (int i = 0; i < PUT_CNT; i++) + assert cache.putx(i, new ValueObject(i)); + + assert cache.size() == PUT_CNT; + + Integer[] keys = new Integer[PUT_CNT]; + + for (int i = 0; i < PUT_CNT; i++) + keys[i] = i; + + for (Integer key : keys) + cache.clear(key); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + + cache.projection(new P1<GridCacheEntry<Integer, ValueObject>>() { + @Override public boolean apply(GridCacheEntry<Integer, ValueObject> e) { + return e.getKey() >= 5; + } + }).reloadAll(Arrays.asList(keys)); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAllAsyncFiltered() throws Exception { + GridCache<Integer, ValueObject> cache = cache(); + + for (int i = 0; i < PUT_CNT; i++) + assert cache.putx(i, new ValueObject(i)); + + assert cache.size() == PUT_CNT; + + Integer[] keys = new Integer[PUT_CNT]; + + for (int i = 0; i < PUT_CNT; i++) + keys[i] = i; + + for (Integer key : keys) + cache.clear(key); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + + cache.projection(new P1<GridCacheEntry<Integer, ValueObject>>() { + @Override public boolean apply(GridCacheEntry<Integer, ValueObject> e) { + return e.getKey() >= 5; + } + }).reloadAllAsync(Arrays.asList(keys)).get(); + + assertEquals(5, cache.size()); + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * Test store. + */ + private static class TestStore extends CacheStoreAdapter<Integer, ValueObject> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, ValueObject> clo, @Nullable Object... args) { + assert clo != null; + + for (int i = 0; i < PUT_CNT; i++) + clo.apply(i, new ValueObject(i)); + } + + /** {@inheritDoc} */ + @Override public ValueObject load(Integer key) { + assert key != null; + + return STORE_MAP.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends ValueObject> e) { + assert e != null; + assert e.getKey() != null; + assert e.getValue() != null; + + STORE_MAP.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + assert key != null; + + STORE_MAP.remove(key); + } + } + + /** + * Value object class. + */ + private static class ValueObject { + /** Value. */ + @GridCacheQuerySqlField + private final int val; + + /** + * @param val Value. + */ + ValueObject(int val) { + this.val = val; + } + + /** + * @return Value. + */ + int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ValueObject.class, this); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java new file mode 100644 index 0000000..3f7e6b0 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for cache query metrics. + */ +public class GridCacheQueryMetricsSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 2; + + /** */ + private static final GridCacheMode CACHE_MODE = REPLICATED; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(CACHE_MODE); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cacheCfg.setQueryConfiguration(qcfg); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * JUnit. + * + * @throws Exception In case of error. + */ + public void testAccumulativeMetrics() throws Exception { + GridCache<String, Integer> cache = cache(0); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0") + .projection(grid(0)); + + // Execute query. + qry.execute().get(); + + GridCacheQueryMetrics m = cache.queries().metrics(); + + assert m != null; + + info("Metrics: " + m); + + assertEquals(1, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + + // Execute again with the same parameters. + qry.execute().get(); + + m = cache.queries().metrics(); + + assert m != null; + + info("Metrics: " + m); + + assertEquals(2, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + } + + /** + * JUnit. + * + * @throws Exception In case of error. + */ + public void testSingleQueryMetrics() throws Exception { + GridCache<String, Integer> cache = cache(0); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0") + .projection(grid(0)); + + // Execute. + qry.execute().get(); + + GridCacheQueryMetrics m = qry.metrics(); + + info("Metrics: " + m); + + assertEquals(1, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + + // Execute. + qry.execute().get(); + + m = qry.metrics(); + + info("Metrics: " + m); + + assertEquals(2, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java new file mode 100644 index 0000000..8b4185d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.processors.query.h2.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Multi-threaded tests for cache queries. + */ +@SuppressWarnings("StatementWithEmptyBody") +public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final boolean TEST_INFO = true; + + /** Number of test grids (nodes). Should not be less than 2. */ + private static final int GRID_CNT = 2; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static AtomicInteger idxSwapCnt = new AtomicInteger(); + + /** */ + private static AtomicInteger idxUnswapCnt = new AtomicInteger(); + + /** */ + private static final long DURATION = 30 * 1000; + + /** Don't start grid by default. */ + public GridCacheQueryMultiThreadedSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(GridCacheDistributionMode.NEAR_PARTITIONED); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(true); + cacheCfg.setBackups(1); + cacheCfg.setEvictionPolicy(evictsEnabled() ? new GridCacheLruEvictionPolicy(100) : null); + + GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cacheCfg.setQueryConfiguration(qcfg); + + if (offheapEnabled() && evictsEnabled()) + cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions. + + cfg.setCacheConfiguration(cacheCfg); + + GridQueryConfiguration indexing = new GridQueryConfiguration(); + + indexing.setMaxOffheapRowsCacheSize(128); + + if (offheapEnabled()) + indexing.setMaxOffHeapMemory(0); + + cfg.setQueryConfiguration(indexing); + + GridQueryProcessor.idxCls = FakeIndexing.class; + + return cfg; + } + + /** + * + */ + private static class FakeIndexing extends GridH2Indexing { + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException { + super.onSwap(spaceName, key); + + idxSwapCnt.incrementAndGet(); + } + + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + throws IgniteCheckedException { + super.onUnswap(spaceName, key, val, valBytes); + + idxUnswapCnt.incrementAndGet(); + } + } + + /** @return {@code true} If offheap enabled. */ + protected boolean offheapEnabled() { + return false; + } + + /** @return {@code true} If evictions enabled. */ + protected boolean evictsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2."; + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + if (evictsEnabled()) { + assertTrue(idxSwapCnt.get() > 0); + assertTrue(idxUnswapCnt.get() > 0); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Clean up all caches. + for (int i = 0; i < GRID_CNT; i++) { + GridCache<Object, Object> c = grid(i).cache(null); + + c.removeAll(F.<GridCacheEntry<Object, Object>>alwaysTrue()); + + Iterator<Map.Entry<Object, Object>> it = c.swapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + it = c.offHeapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys()); + assertEquals(0, c.offHeapEntriesCount()); + assertEquals(0, c.size()); + } + } + + /** {@inheritDoc} */ + @Override protected void info(String msg) { + if (TEST_INFO) + super.info(msg); + } + + /** + * @param entries Entries. + * @param g Grid. + * @return Affinity nodes. + */ + private Set<UUID> affinityNodes(Iterable<Map.Entry<Integer, Integer>> entries, Ignite g) { + Set<UUID> nodes = new HashSet<>(); + + for (Map.Entry<Integer, Integer> entry : entries) + nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id()); + + return nodes; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final GridCache<Integer, String> c = g.cache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size()); + assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.putx(i, String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + assertTrue(c.evict(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + switch (rnd.nextInt(5)) { + case 0: + c.putx(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.evict(rnd.nextInt(keyCnt)); + + break; + case 2: + c.remove(rnd.nextInt(keyCnt)); + + break; + case 3: + c.get(rnd.nextInt(keyCnt)); + + break; + case 4: + GridCacheQuery<Map.Entry<Integer, String>> qry = c.queries().createSqlQuery( + String.class, "_val between ? and ?"); + + int from = rnd.nextInt(valCnt); + + GridCacheQueryFuture<Map.Entry<Integer, String>> fut = + qry.execute(String.valueOf(from), String.valueOf(from + 250)); + + Collection<Map.Entry<Integer, String>> res = fut.get(); + + for (Map.Entry<Integer, String> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLong() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final GridCache<Integer, Long> c = g.cache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size()); + assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.putx(i, (long)rnd.nextInt(valCnt)); + + if (evictsEnabled() && rnd.nextBoolean()) + assertTrue(c.evict(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.putx(key, (long)rnd.nextInt(valCnt)); + + break; + case 1: + if (evictsEnabled()) + c.evict(key); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + GridCacheQuery<Map.Entry<Integer, Long>> qry = c.queries().createSqlQuery( + Long.class, + "_val between ? and ?"); + + int from = rnd.nextInt(valCnt); + + GridCacheQueryFuture<Map.Entry<Integer, Long>> f = qry.execute(from, from + 250); + + Collection<Map.Entry<Integer, Long>> res = f.get(); + + for (Map.Entry<Integer, Long> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapLongString() throws Exception { + int threadCnt = 150; + final int keyCnt = 2000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final GridCache<Integer, Object> c = g.cache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.offHeapEntriesCount()); +// assertEquals(0, c.swapKeys()); + assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size()); + assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.putx(i, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + assertTrue(c.evict(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.putx(key, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) : + String.valueOf(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.evict(key); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + GridCacheQuery<Map.Entry<Integer, Object>> qry = c.queries().createSqlQuery( + rnd.nextBoolean() ? Long.class : String.class, + "_val between ? and ?"); + + int from = rnd.nextInt(valCnt); + + GridCacheQueryFuture<Map.Entry<Integer, Object>> f = qry.execute(from, from + 250); + + Collection<Map.Entry<Integer, Object>> res = f.get(); + + for (Map.Entry<Integer, Object> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSwapUnswapObject() throws Exception { + int threadCnt = 50; + final int keyCnt = 4000; + final int valCnt = 10000; + + final Ignite g = grid(0); + + // Put test values into cache. + final GridCache<Integer, TestValue> c = g.cache(null); + + assertEquals(0, g.cache(null).size()); + assertEquals(0, c.queries().createSqlQuery(String.class, "1 = 1").execute().get().size()); + assertEquals(0, c.queries().createSqlQuery(Long.class, "1 = 1").execute().get().size()); + + Random rnd = new Random(); + + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { + c.putx(i, new TestValue(rnd.nextInt(valCnt))); + + if (evictsEnabled() && rnd.nextBoolean()) + assertTrue(c.evict(i)); + } + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + Random rnd = new Random(); + + while (!done.get()) { + int key = rnd.nextInt(keyCnt); + + switch (rnd.nextInt(5)) { + case 0: + c.putx(key, new TestValue(rnd.nextInt(valCnt))); + + break; + case 1: + if (evictsEnabled()) + c.evict(key); + + break; + case 2: + c.remove(key); + + break; + case 3: + c.get(key); + + break; + case 4: + GridCacheQuery<Map.Entry<Integer, TestValue>> qry = c.queries().createSqlQuery( + Long.class, "TestValue.val between ? and ?"); + + int from = rnd.nextInt(valCnt); + + GridCacheQueryFuture<Map.Entry<Integer, TestValue>> f = qry.execute(from, from + 250); + + Collection<Map.Entry<Integer, TestValue>> res = f.get(); + + for (Map.Entry<Integer, TestValue> ignored : res) { + //No-op. + } + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSameQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + GridCache<Integer, Integer> c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) { + c.putx(i, i); + + info("Affinity [key=" + i + ", aff=" + c.affinity().mapKeyToPrimaryAndBackups(i).iterator().next().id() + ']'); + + assertTrue(c.evict(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + final GridCacheQuery<Map.Entry<Integer, Integer>> qry = c.queries().createSqlQuery(Integer.class, "_val >= 0"); + + IgniteFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Collection<Map.Entry<Integer, Integer>> entries = fut.get(); + + assert entries != null; + + assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) + + ", iteration=" + iter + ']', keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + info("Finishing test..."); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedNewQueries() throws Exception { + int threadCnt = 50; + final int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final GridCache<Integer, Integer> c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) { + c.putx(i, i); + + assertTrue(c.evict(i)); + } + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + GridCacheQuery<Map.Entry<Integer, Integer>> qry = + c.queries().createSqlQuery(Integer.class, "_val >= 0"); + + GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Collection<Map.Entry<Integer, Integer>> entries = fut.get(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedReduceQuery() throws Exception { + int threadCnt = 50; + int keyCnt = 10; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + GridCache<Integer, Integer> c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) + c.putx(i, i); + + final GridCacheQuery<Map.Entry<Integer, Integer>> rdcQry = + c.queries().createSqlQuery(Integer.class, "_val > 1 and _val < 4"); + + rdcQry.includeBackups(true); + rdcQry.keepAll(true); + + final IgniteReducer<Map.Entry<Integer, Integer>, Integer> rmtRdc = + new IgniteReducer<Map.Entry<Integer, Integer>, Integer>() { + /** Reducer result. */ + private int res; + + @Override public boolean collect(Map.Entry<Integer, Integer> e) { + res += e.getKey(); + + return true; + } + + @Override public Integer reduce() { + return res; + } + }; + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteFuture<?> fut = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!stop.get()) { + Collection<Integer> rmtVals = rdcQry.execute(rmtRdc).get(); + + assertEquals(GRID_CNT, rmtVals.size()); + + Iterator<Integer> reduceIter = rmtVals.iterator(); + + assert reduceIter != null; + + for (int i = 0; i < GRID_CNT; i++) { + assert reduceIter.hasNext(); + + assertEquals(Integer.valueOf(5), reduceIter.next()); + } + + Collection<Integer> res = rdcQry.execute(rmtRdc).get(); + + int val = F.sumInt(res); + + int expVal = 5 * GRID_CNT; + + assertEquals(expVal, val); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + stop.set(true); + + fut.get(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedScanQuery() throws Exception { + int threadCnt = 50; + final int keyCnt = 500; + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + GridCache<Integer, Integer> c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) + c.putx(i, i); + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + final GridCacheQuery<Map.Entry<Integer, Integer>> qry = c.queries().createScanQuery(null); + + IgniteFuture<?> fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + GridCacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Collection<Map.Entry<Integer, Integer>> entries = fut.get(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager<Object, Object> qryMgr = + ((GridKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** + * Test value. + */ + private static class TestValue implements Serializable { + /** Value. */ + @GridCacheQuerySqlField + private int val; + + /** + * @param val Value. + */ + private TestValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java new file mode 100644 index 0000000..e815ae5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapEvictsMultiThreadedSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Multi-threaded tests for cache queries. + */ +public class GridCacheQueryOffheapEvictsMultiThreadedSelfTest extends GridCacheQueryOffheapMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean evictsEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java new file mode 100644 index 0000000..35bf001 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryOffheapMultiThreadedSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Queries over off-heap indexes. + */ +public class GridCacheQueryOffheapMultiThreadedSelfTest extends GridCacheQueryMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean offheapEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java new file mode 100644 index 0000000..55c2fd9 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryTestValue.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.query.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import java.io.*; + +/** + * Query test value. + */ +@SuppressWarnings("unused") +public class GridCacheQueryTestValue implements Serializable { + /** */ + @GridCacheQueryTextField + @GridCacheQuerySqlField(name = "fieldname") + private String field1; + + /** */ + private int field2; + + /** */ + @GridCacheQuerySqlField(unique = true) + private long field3; + + /** */ + @GridCacheQuerySqlField(orderedGroups = { + @GridCacheQuerySqlField.Group(name = "grp1", order = 1), + @GridCacheQuerySqlField.Group(name = "grp2", order = 2)}) + private long field4; + + /** */ + @GridCacheQuerySqlField(orderedGroups = {@GridCacheQuerySqlField.Group(name = "grp1", order = 2)}) + private long field5; + + /** */ + @GridCacheQuerySqlField(orderedGroups = {@GridCacheQuerySqlField.Group(name = "grp1", order = 3)}) + private GridCacheQueryEmbeddedValue field6 = new GridCacheQueryEmbeddedValue(); + + /** + * + * @return Field. + */ + public String getField1() { + return field1; + } + + /** + * + * @param field1 Field. + */ + public void setField1(String field1) { + this.field1 = field1; + } + + /** + * + * @return Field. + */ + @GridCacheQuerySqlField + public int getField2() { + return field2; + } + + /** + * + * @param field2 Field. + */ + public void setField2(int field2) { + this.field2 = field2; + } + + /** + * + * @return Field. + */ + public long getField3() { + return field3; + } + + /** + * + * @param field3 Field. + */ + public void setField3(long field3) { + this.field3 = field3; + } + + /** + * + * @return Field. + */ + public long getField4() { + return field4; + } + + /** + * + * @param field4 Field. + */ + public void setField4(long field4) { + this.field4 = field4; + } + + /** + * @return Field. + */ + public long getField5() { + return field5; + } + + /** + * @param field5 Field. + */ + public void setField5(long field5) { + this.field5 = field5; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"RedundantIfStatement"}) + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + GridCacheQueryTestValue that = (GridCacheQueryTestValue)o; + + if (field2 != that.field2) + return false; + + if (field3 != that.field3) + return false; + + if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (field1 != null ? field1.hashCode() : 0); + + res = 31 * res + field2; + res = 31 * res + (int)(field3 ^ (field3 >>> 32)); + + return res; + } + + /** + * @param field6 Embedded value. + */ + public void setField6(GridCacheQueryEmbeddedValue field6) { + this.field6 = field6; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java new file mode 100644 index 0000000..5238b4f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Multithreaded reduce query tests with lots of data. + */ +public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int TEST_TIMEOUT = 2 * 60 * 1000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return c; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + GridCacheQueryConfiguration qcfg = new GridCacheQueryConfiguration(); + + qcfg.setIndexPrimitiveKey(true); + + cfg.setQueryConfiguration(qcfg); + + return cfg; + } + + /** + * @throws Exception In case of error. + */ + public void testReduceQuery() throws Exception { + final int keyCnt = 5000; + final int logFreq = 500; + + final GridCache<String, Integer> c = cache(); + + final CountDownLatch startLatch = new CountDownLatch(1); + + IgniteFuture<?> fut1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 1; i < keyCnt; i++) { + assertTrue(c.putx(String.valueOf(i), i)); + + startLatch.countDown(); + + if (i % logFreq == 0) + info("Stored entries: " + i); + } + + return null; + } + }, 1); + + // Create query. + final GridCacheQuery<Map.Entry<String, Integer>> sumQry = + c.queries().createSqlQuery(Integer.class, "_val > 0").timeout(TEST_TIMEOUT); + + final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new R1<Map.Entry<String, Integer>, Integer>() { + /** */ + private AtomicInteger sum = new AtomicInteger(); + + @Override public boolean collect(Map.Entry<String, Integer> e) { + sum.addAndGet(e.getValue()); + + return true; + } + + @Override public Integer reduce() { + return sum.get(); + } + }; + + final AtomicBoolean stop = new AtomicBoolean(); + + startLatch.await(); + + IgniteFuture<?> fut2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int cnt = 0; + + while (!stop.get()) { + Collection<Integer> res = sumQry.execute(rmtRdc).get(); + + int sum = F.sumInt(res); + + cnt++; + + assertTrue(sum > 0); + + if (cnt % logFreq == 0) { + info("Reduced value: " + sum); + info("Executed queries: " + cnt); + } + } + + return null; + } + }, 1); + + fut1.get(); + + stop.set(true); + + fut2.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39bc4257/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java new file mode 100644 index 0000000..a1acfd5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSqlQueryMultiThreadedSelfTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * + */ +public class GridCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setDistributionMode(PARTITIONED_ONLY); + ccfg.setQueryIndexEnabled(true); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + + c.setCacheConfiguration(ccfg); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + final GridCache<Integer, Person> cache = grid(0).cache(null); + + for (int i = 0; i < 2000; i++) + cache.put(i, new Person(i)); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < 100; i++) { + GridCacheQuery<Map.Entry<Integer, Person>> qry = + cache.queries().createSqlQuery("Person", "age >= 0"); + + qry.includeBackups(false); + qry.enableDedup(true); + qry.keepAll(true); + qry.pageSize(50); + + GridCacheQueryFuture<Map.Entry<Integer, Person>> fut = qry.execute(); + + int cnt = 0; + + while (fut.next() != null) + cnt++; + + assertEquals(2000, cnt); + } + + return null; + } + }, 16, "test"); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + @GridCacheQuerySqlField + private int age; + + /** + * @param age Age. + */ + Person(int age) { + this.age = age; + } + + /** + * @return Age/ + */ + public int age() { + return age; + } + } +}