#ignite-565: Revert GridCacheReduceQueryMultithreadedSelfTest. Change signature 
Ignite.clearAll(Set<? extends K> keys);


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ab6d387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ab6d387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ab6d387

Branch: refs/heads/sprint-3
Commit: 1ab6d38727b67a688ecca5bab4a59a8fa089336d
Parents: 2b788cc
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Thu Mar 26 15:16:28 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Thu Mar 26 15:16:28 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   2 +-
 .../processors/cache/CacheProjection.java       |   6 +-
 .../processors/cache/GridCacheAdapter.java      |  10 +-
 .../cache/GridCacheProjectionImpl.java          |   6 +-
 .../processors/cache/GridCacheProxyImpl.java    |   6 +-
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 ...idCacheReduceQueryMultithreadedSelfTest.java | 155 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   1 +
 8 files changed, 172 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 4873fcb..5c5bb25 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -378,7 +378,7 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
      * @throws CacheException        if there is a problem during the clear
      */
     @IgniteAsyncSupported
-    public void clearAll(Set<K> keys);
+    public void clearAll(Set<? extends K> keys);
 
     /**
      * Clear entry from the cache and swap storage, without notifying 
listeners or

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
index 6659735..558f8af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
@@ -1268,7 +1268,7 @@ public interface CacheProjection<K, V> extends 
Iterable<Cache.Entry<K, V>> {
      *
      * @param keys Keys to clearLocally.
      */
-    public void clearLocallyAll(Set<K> keys);
+    public void clearLocallyAll(Set<? extends K> keys);
 
     /**
      * Clears key on all nodes that store it's data. That is, caches are 
cleared on remote
@@ -1294,7 +1294,7 @@ public interface CacheProjection<K, V> extends 
Iterable<Cache.Entry<K, V>> {
      * @param keys Keys to clear.
      * @throws IgniteCheckedException In case of cache could not be cleared on 
any of the nodes.
      */
-    public void clearAll(Set<K> keys) throws IgniteCheckedException;
+    public void clearAll(Set<? extends K> keys) throws IgniteCheckedException;
 
     /**
      * Clears cache on all nodes that store it's data. That is, caches are 
cleared on remote
@@ -1326,7 +1326,7 @@ public interface CacheProjection<K, V> extends 
Iterable<Cache.Entry<K, V>> {
      * @param keys Keys to clear.
      * @return Clear future.
      */
-    public IgniteInternalFuture<?> clearAsync(Set<K> keys);
+    public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys);
 
     /**
      * Clears cache on all nodes that store it's data. That is, caches are 
cleared on remote

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cd0aec0..1973330 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1350,7 +1350,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void clearLocallyAll(Set<K> keys) {
+    @Override public void clearLocallyAll(Set<? extends K> keys) {
         clearLocally0(keys);
     }
 
@@ -1458,7 +1458,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+    @Override public void clearAll(Set<? extends K> keys) throws 
IgniteCheckedException {
         // Clear local cache synchronously.
         clearLocallyAll(keys);
 
@@ -1471,7 +1471,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+    @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) 
{
         return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
     }
 
@@ -5640,7 +5640,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         private static final long serialVersionUID = 0L;
 
         /** Keys to remove. */
-        private Set<K> keys;
+        private Set<? extends K> keys;
 
         /**
          * Empty constructor for serialization.
@@ -5653,7 +5653,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
          * @param cacheName Cache name.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, Set<K> keys) {
+        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> 
keys) {
             super(cacheName);
 
             this.keys = keys;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 5c05f30..4ab187d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -875,7 +875,7 @@ public class GridCacheProjectionImpl<K, V> implements 
GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void clearLocallyAll(Set<K> keys) {
+    @Override public void clearLocallyAll(Set<? extends K> keys) {
         cache.clearLocallyAll(keys);
     }
 
@@ -890,7 +890,7 @@ public class GridCacheProjectionImpl<K, V> implements 
GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+    @Override public void clearAll(Set<? extends K> keys) throws 
IgniteCheckedException {
         cache.clearAll(keys);
     }
 
@@ -900,7 +900,7 @@ public class GridCacheProjectionImpl<K, V> implements 
GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+    @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) 
{
         return cache.clearAsync(keys);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 772a3b7..1c25ef0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1288,7 +1288,7 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+    @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) 
{
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -1324,7 +1324,7 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void clearLocallyAll(Set<K> keys) {
+    @Override public void clearLocallyAll(Set<? extends K> keys) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -1348,7 +1348,7 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+    @Override public void clearAll(Set<? extends K> keys) throws 
IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 22bfe39..2e1deaa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1024,7 +1024,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void clearAll(Set<K> keys) {
+    @Override public void clearAll(Set<? extends K> keys) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
new file mode 100644
index 0000000..2dfa542
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.marshaller.optimized.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Multithreaded reduce query tests with lots of data.
+ */
+public class GridCacheReduceQueryMultithreadedSelfTest extends 
GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 5;
+
+    /** */
+    private static final int TEST_TIMEOUT = 2 * 60 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setMarshaller(new OptimizedMarshaller(false));
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setBackups(1);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setIndexedTypes(
+            String.class, Integer.class
+        );
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testReduceQuery() throws Exception {
+        final int keyCnt = 5000;
+        final int logFreq = 500;
+
+        final GridCacheAdapter<String, Integer> c = internalCache(jcache());
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                for (int i = 1; i < keyCnt; i++) {
+                    c.put(String.valueOf(i), i);
+
+                    startLatch.countDown();
+
+                    if (i % logFreq == 0)
+                        info("Stored entries: " + i);
+                }
+
+                return null;
+            }
+        }, 1);
+
+        // Create query.
+        final CacheQuery<Map.Entry<String, Integer>> sumQry =
+            c.queries().createSqlQuery(Integer.class, "_val > 
0").timeout(TEST_TIMEOUT);
+
+        final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new 
R1<Map.Entry<String, Integer>, Integer>() {
+            /** */
+            private AtomicInteger sum = new AtomicInteger();
+
+            @Override public boolean collect(Map.Entry<String, Integer> e) {
+                sum.addAndGet(e.getValue());
+
+                return true;
+            }
+
+            @Override public Integer reduce() {
+                return sum.get();
+            }
+        };
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        startLatch.await();
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                int cnt = 0;
+
+                while (!stop.get()) {
+                    Collection<Integer> res = sumQry.execute(rmtRdc).get();
+
+                    int sum = F.sumInt(res);
+
+                    cnt++;
+
+                    assertTrue(sum > 0);
+
+                    if (cnt % logFreq == 0) {
+                        info("Reduced value: " + sum);
+                        info("Executed queries: " + cnt);
+                    }
+                }
+
+                return null;
+            }
+        }, 1);
+
+        fut1.get();
+
+        stop.set(true);
+
+        fut2.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index dc9a480..e29af9f 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -62,6 +62,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         
suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
 //        suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO 
IGNITE-484
+        suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class);
         suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
 
         // Fields queries.

Reply via email to