#ignite-565: Revert GridCacheReduceQueryMultithreadedSelfTest. Change signature Ignite.clearAll(Set<? extends K> keys);
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ab6d387 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ab6d387 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ab6d387 Branch: refs/heads/sprint-3 Commit: 1ab6d38727b67a688ecca5bab4a59a8fa089336d Parents: 2b788cc Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Mar 26 15:16:28 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Mar 26 15:16:28 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 2 +- .../processors/cache/CacheProjection.java | 6 +- .../processors/cache/GridCacheAdapter.java | 10 +- .../cache/GridCacheProjectionImpl.java | 6 +- .../processors/cache/GridCacheProxyImpl.java | 6 +- .../processors/cache/IgniteCacheProxy.java | 2 +- ...idCacheReduceQueryMultithreadedSelfTest.java | 155 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 8 files changed, 172 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 4873fcb..5c5bb25 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -378,7 +378,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @throws CacheException if there is a problem during the clear */ @IgniteAsyncSupported - public void clearAll(Set<K> keys); + public void clearAll(Set<? extends K> keys); /** * Clear entry from the cache and swap storage, without notifying listeners or http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index 6659735..558f8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -1268,7 +1268,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * * @param keys Keys to clearLocally. */ - public void clearLocallyAll(Set<K> keys); + public void clearLocallyAll(Set<? extends K> keys); /** * Clears key on all nodes that store it's data. That is, caches are cleared on remote @@ -1294,7 +1294,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @param keys Keys to clear. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - public void clearAll(Set<K> keys) throws IgniteCheckedException; + public void clearAll(Set<? extends K> keys) throws IgniteCheckedException; /** * Clears cache on all nodes that store it's data. That is, caches are cleared on remote @@ -1326,7 +1326,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @param keys Keys to clear. * @return Clear future. */ - public IgniteInternalFuture<?> clearAsync(Set<K> keys); + public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys); /** * Clears cache on all nodes that store it's data. That is, caches are cleared on remote http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cd0aec0..1973330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1350,7 +1350,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void clearLocallyAll(Set<K> keys) { + @Override public void clearLocallyAll(Set<? extends K> keys) { clearLocally0(keys); } @@ -1458,7 +1458,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void clearAll(Set<K> keys) throws IgniteCheckedException { + @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { // Clear local cache synchronously. clearLocallyAll(keys); @@ -1471,7 +1471,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) { + @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); } @@ -5640,7 +5640,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, private static final long serialVersionUID = 0L; /** Keys to remove. */ - private Set<K> keys; + private Set<? extends K> keys; /** * Empty constructor for serialization. @@ -5653,7 +5653,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param cacheName Cache name. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, Set<K> keys) { + private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { super(cacheName); this.keys = keys; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 5c05f30..4ab187d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -875,7 +875,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void clearLocallyAll(Set<K> keys) { + @Override public void clearLocallyAll(Set<? extends K> keys) { cache.clearLocallyAll(keys); } @@ -890,7 +890,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void clearAll(Set<K> keys) throws IgniteCheckedException { + @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { cache.clearAll(keys); } @@ -900,7 +900,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) { + @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { return cache.clearAsync(keys); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 772a3b7..1c25ef0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1288,7 +1288,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) { + @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1324,7 +1324,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void clearLocallyAll(Set<K> keys) { + @Override public void clearLocallyAll(Set<? extends K> keys) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1348,7 +1348,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void clearAll(Set<K> keys) throws IgniteCheckedException { + @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 22bfe39..2e1deaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1024,7 +1024,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public void clearAll(Set<K> keys) { + @Override public void clearAll(Set<? extends K> keys) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java new file mode 100644 index 0000000..2dfa542 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.marshaller.optimized.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Multithreaded reduce query tests with lots of data. + */ +public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int TEST_TIMEOUT = 2 * 60 * 1000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshaller(new OptimizedMarshaller(false)); + + return c; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setIndexedTypes( + String.class, Integer.class + ); + + return cfg; + } + + /** + * @throws Exception In case of error. + */ + public void testReduceQuery() throws Exception { + final int keyCnt = 5000; + final int logFreq = 500; + + final GridCacheAdapter<String, Integer> c = internalCache(jcache()); + + final CountDownLatch startLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 1; i < keyCnt; i++) { + c.put(String.valueOf(i), i); + + startLatch.countDown(); + + if (i % logFreq == 0) + info("Stored entries: " + i); + } + + return null; + } + }, 1); + + // Create query. + final CacheQuery<Map.Entry<String, Integer>> sumQry = + c.queries().createSqlQuery(Integer.class, "_val > 0").timeout(TEST_TIMEOUT); + + final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new R1<Map.Entry<String, Integer>, Integer>() { + /** */ + private AtomicInteger sum = new AtomicInteger(); + + @Override public boolean collect(Map.Entry<String, Integer> e) { + sum.addAndGet(e.getValue()); + + return true; + } + + @Override public Integer reduce() { + return sum.get(); + } + }; + + final AtomicBoolean stop = new AtomicBoolean(); + + startLatch.await(); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int cnt = 0; + + while (!stop.get()) { + Collection<Integer> res = sumQry.execute(rmtRdc).get(); + + int sum = F.sumInt(res); + + cnt++; + + assertTrue(sum > 0); + + if (cnt % logFreq == 0) { + info("Reduced value: " + sum); + info("Executed queries: " + cnt); + } + } + + return null; + } + }, 1); + + fut1.get(); + + stop.set(true); + + fut2.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index dc9a480..e29af9f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -62,6 +62,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class); // suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO IGNITE-484 + suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); // Fields queries.