# ignite-373

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ceb28354
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ceb28354
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ceb28354

Branch: refs/heads/ignite-373
Commit: ceb283542744a8a51fadc1ea9f70de1cfd7b024f
Parents: bf78538
Author: sboikov <sboi...@gridgain.com>
Authored: Tue May 12 14:53:53 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue May 12 14:57:05 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            |  43 +++---
 .../CacheContinuousQueryRestartSelfTest.java    | 145 -------------------
 ...ridCacheContinuousQueryAbstractSelfTest.java |   4 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   1 -
 4 files changed, 21 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceb28354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 91b7be8..d10ab56 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -314,33 +314,31 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
                     
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
-                    for (int part = 0; part < dht.affinity().partitions(); 
++part) {
-                        if (ctx.affinity().belongs(ctx.localNode(), part, 
topVer)) {
-                            GridDhtLocalPartition locPart = 
dht.topology().localPartition(part, topVer, false);
-
-                            if (locPart == null || locPart.state() != OWNING 
|| !locPart.reserve())
-                                return false;
-
-                            try {
-                                if (!locPart.isEmpty() && 
locPart.primary(topVer)) {
-                                    for (GridDhtCacheEntry o : 
locPart.entries()) {
-                                        if (!o.obsoleteOrDeleted())
-                                            
dataLdr.removeDataInternal(o.key());
-                                    }
-                                }
+                    for (int part : 
ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
+                        GridDhtLocalPartition locPart = 
dht.topology().localPartition(part, topVer, false);
 
-                                GridCloseableIterator<Map.Entry<byte[], 
GridCacheSwapEntry>> iter =
-                                    ctx.swap().iterator(part);
+                        if (locPart == null || locPart.state() != OWNING || 
!locPart.reserve())
+                            return false;
 
-                                if (iter != null) {
-                                    for (Map.Entry<byte[], GridCacheSwapEntry> 
e : iter)
-                                        
dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+                        try {
+                            if (!locPart.isEmpty()) {
+                                for (GridDhtCacheEntry o : locPart.entries()) {
+                                    if (!o.obsoleteOrDeleted())
+                                        dataLdr.removeDataInternal(o.key());
                                 }
                             }
-                            finally {
-                                locPart.release();
+
+                            GridCloseableIterator<Map.Entry<byte[], 
GridCacheSwapEntry>> iter =
+                                ctx.swap().iterator(part);
+
+                            if (iter != null) {
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : 
iter)
+                                    
dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
                             }
                         }
+                        finally {
+                            locPart.release();
+                        }
                     }
                 }
 
@@ -357,9 +355,6 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 ctx.gate().leave();
             }
 
-            if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                return false;
-
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceb28354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRestartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRestartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRestartSelfTest.java
deleted file mode 100644
index bd03489..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRestartSelfTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jsr166.*;
-
-import javax.cache.*;
-import javax.cache.event.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheRebalanceMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Continuous queries test.
- */
-public class CacheContinuousQueryRestartSelfTest extends 
GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Grid count. */
-    private static final int GRID_COUNT = 3;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        cacheCfg.setBackups(2);
-        cacheCfg.setRebalanceMode(ASYNC);
-        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-        cacheCfg.setReadThrough(true);
-        cacheCfg.setWriteThrough(true);
-        cacheCfg.setCacheStoreFactory(new 
GridCacheContinuousQueryAbstractSelfTest.StoreFactory());
-        cacheCfg.setLoadPreviousValue(true);
-
-        cfg.setCacheConfiguration(cacheCfg);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(disco);
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        startGrids(GRID_COUNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNodeJoin() throws Exception {
-        final ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
-
-        final Collection<CacheEntryEvent<? extends Integer, ? extends 
Integer>> all = new ConcurrentLinkedDeque8<>();
-
-        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
-            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
-                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt 
: evts)
-                    all.add(evt);
-            }
-        });
-
-        final AtomicInteger id = new AtomicInteger(GRID_COUNT);
-
-        final int keyCount = 1000;
-
-        final Random random = new Random(keyCount);
-
-        IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Runnable() {
-            @Override public void run() {
-                for (int i = 0; i < 2; ++i) {
-                    IgniteCache cache = grid(0).cache(null);
-
-                    int gridId = id.getAndIncrement();
-
-                    try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
grid(0).cache(null).query(qry)) {
-                        cache.put(0, 0);
-
-                        startGrid(gridId);
-
-                        for (int j = 1; j < 40; j++)
-                            cache.put(random.nextInt(), j);
-                    }
-                    catch (Exception e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }
-        }, 5, "QueryThread");
-
-        fut.get();
-
-        for (int i = 0; i < id.get(); i++) {
-            IgniteCache<Object, Object> cache = grid(i).cache(null);
-
-            cache.removeAll();
-        }
-
-        for (int i = 0; i < GRID_COUNT; i++)
-            assertEquals("Cache is not empty [entrySet=" + 
grid(i).cache(null).localEntries() +
-                    ", i=" + i + ']', 0, grid(i).cache(null).localSize());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceb28354/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 86f6c9c..5a78f9f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -993,7 +993,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     /**
      *
      */
-    public static class StoreFactory implements Factory<CacheStore> {
+    private static class StoreFactory implements Factory<CacheStore> {
         @Override public CacheStore create() {
             return new TestStore();
         }
@@ -1002,7 +1002,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     /**
      * Store.
      */
-    public static class TestStore extends CacheStoreAdapter<Object, Object> {
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
         /** {@inheritDoc} */
         @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
Object... args) {
             for (int i = 0; i < 10; i++)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceb28354/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index c781193..f42963a 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -93,7 +93,6 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
-        suite.addTestSuite(CacheContinuousQueryRestartSelfTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);

Reply via email to