# IGNITE-489 Added EVT_CACHE_PARTITION_NOT_FULLY_LOADED event.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9a51544e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9a51544e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9a51544e

Branch: refs/heads/sprint-3
Commit: 9a51544e76fcebad1bae14e40e9541e3eecaf05b
Parents: 10be140
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Wed Mar 25 15:09:46 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Wed Mar 25 15:09:46 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     |   8 +
 .../processors/cache/GridCacheEventManager.java |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   6 +
 .../preloader/GridDhtPartitionDemandPool.java   |   4 +
 ...ridCachePartitionNotLoadedEventSelfTest.java | 187 +++++++++++++++++++
 5 files changed, 207 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 2448d0a..b458344 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -608,6 +608,14 @@ public interface EventType {
     public static final int EVT_CACHE_REBALANCE_OBJECT_UNLOADED = 85;
 
     /**
+     * Built-in event type: all nodes that hold partition left topology.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     */
+    public static final int EVT_CACHE_PARTITION_NOT_FULLY_LOADED = 86;
+
+    /**
      * Built-in event type: query executed.
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index f543e3b..1ff6c71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -240,7 +240,8 @@ public class GridCacheEventManager extends 
GridCacheManagerAdapter {
         @Nullable String cloClsName,
         @Nullable String taskName
     ) {
-        assert key != null || type == EVT_CACHE_STARTED || type == 
EVT_CACHE_STOPPED;
+        assert key != null || type == EVT_CACHE_STARTED || type == 
EVT_CACHE_STOPPED
+            || type == EVT_CACHE_PARTITION_NOT_FULLY_LOADED;
 
         if (!cctx.events().isRecordable(type))
             LT.warn(log, null, "Added event without checking if event is 
recordable: " + U.gridEventName(type));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c33ffcb..5bb4e11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -33,6 +34,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.events.EventType.*;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
@@ -440,6 +442,10 @@ class GridDhtPartitionTopologyImpl<K, V> implements 
GridDhtPartitionTopology {
 
                                 changed = true;
 
+                                cctx.events().addEvent(p, null, 
cctx.localNodeId(), (IgniteUuid)null,
+                                    null, 
EVT_CACHE_PARTITION_NOT_FULLY_LOADED, null, false, null,
+                                    false, null, null, null);
+
                                 if (log.isDebugEnabled())
                                     log.debug("Owned partition: " + locPart);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 98128c8..4d5b83f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -1041,6 +1041,10 @@ public class GridDhtPartitionDemandPool<K, V> {
                 if (picked.isEmpty()) {
                     top.own(part);
 
+                    cctx.events().addEvent(p, null, cctx.localNodeId(), 
(IgniteUuid)null,
+                        null, EVT_CACHE_PARTITION_NOT_FULLY_LOADED, null, 
false, null,
+                        false, null, null, null);
+
                     if (log.isDebugEnabled())
                         log.debug("Owning partition as there are no other 
owners: " + part);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
new file mode 100644
index 0000000..21916f2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.eclipse.jetty.util.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCachePartitionNotLoadedEventSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private int backupCnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.matches(".*\\d")) {
+            String idStr = UUID.randomUUID().toString();
+
+            char[] chars = idStr.toCharArray();
+
+            chars[chars.length - 3] = '0';
+            chars[chars.length - 2] = '0';
+            chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+
+            cfg.setNodeId(UUID.fromString(new String(chars)));
+        }
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>();
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setBackups(backupCnt);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        cfg.setCommunicationSpi(new FilteredCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    public void testPrimaryAndBackupDead() throws Exception {
+        backupCnt = 1;
+
+        startGrid(0);
+        startGrid(1);
+        startGrid(2);
+
+        PartitionNotFullyLoadedListener lsnr = new 
PartitionNotFullyLoadedListener();
+
+        ignite(2).events().localListen(lsnr, 
EventType.EVT_CACHE_PARTITION_NOT_FULLY_LOADED);
+
+        Affinity<Integer> aff = ignite(0).affinity(null);
+
+        int key = 0;
+
+        while (!aff.isPrimary(ignite(0).cluster().localNode(), key)
+            || !aff.isBackup(ignite(1).cluster().localNode(), key))
+            key++;
+
+        IgniteCache<Integer, Integer> cache = jcache(2);
+
+        cache.put(key, key);
+
+        assert jcache(0).containsKey(key);
+        assert jcache(1).containsKey(key);
+
+//        
((FilteredCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).stop(true);
+//        
((FilteredCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).stop(true);
+
+        stopGrid(0, false);
+        stopGrid(1, false);
+
+        //startGrid(4);
+
+        awaitPartitionMapExchange();
+
+        assert !cache.containsKey(key);
+
+        assert !lsnr.lostParts.isEmpty();
+    }
+
+    /**
+     *
+     */
+    public void testPrimaryDead() throws Exception {
+        startGrid(0);
+        startGrid(1);
+
+        PartitionNotFullyLoadedListener lsnr = new 
PartitionNotFullyLoadedListener();
+
+        ignite(1).events().localListen(lsnr, 
EventType.EVT_CACHE_PARTITION_NOT_FULLY_LOADED);
+
+        int key = primaryKey(jcache(0));
+
+        jcache(1).put(key, key);
+
+        assert jcache(0).containsKey(key);
+
+//        
((FilteredCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).stop(true);
+//        
((FilteredCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).stop(true);
+
+        stopGrid(0, true);
+
+        awaitPartitionMapExchange();
+
+       // startGrid(4);
+
+        awaitPartitionMapExchange();
+
+        assert !jcache(1).containsKey(key);
+
+        assert !lsnr.lostParts.isEmpty();
+    }
+
+    /**
+     *
+     */
+    private static class PartitionNotFullyLoadedListener implements 
IgnitePredicate<Event> {
+        /** */
+        private Collection<Integer> lostParts = new ConcurrentHashSet<>();
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            lostParts.add(((CacheEvent)evt).partition());
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FilteredCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private volatile boolean stop;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) 
throws IgniteSpiException {
+            if (!stop)
+                super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param stop Filter.
+         */
+        public void stop(boolean stop) {
+            this.stop = stop;
+        }
+    }
+}

Reply via email to