# IGNITE-489 Added EVT_CACHE_PARTITION_NOT_FULLY_LOADED event.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9a51544e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9a51544e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9a51544e Branch: refs/heads/ignite-341 Commit: 9a51544e76fcebad1bae14e40e9541e3eecaf05b Parents: 10be140 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed Mar 25 15:09:46 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed Mar 25 15:09:46 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/events/EventType.java | 8 + .../processors/cache/GridCacheEventManager.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 6 + .../preloader/GridDhtPartitionDemandPool.java | 4 + ...ridCachePartitionNotLoadedEventSelfTest.java | 187 +++++++++++++++++++ 5 files changed, 207 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 2448d0a..b458344 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -608,6 +608,14 @@ public interface EventType { public static final int EVT_CACHE_REBALANCE_OBJECT_UNLOADED = 85; /** + * Built-in event type: all nodes that hold partition left topology. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + */ + public static final int EVT_CACHE_PARTITION_NOT_FULLY_LOADED = 86; + + /** * Built-in event type: query executed. * <p> * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index f543e3b..1ff6c71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -240,7 +240,8 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { @Nullable String cloClsName, @Nullable String taskName ) { - assert key != null || type == EVT_CACHE_STARTED || type == EVT_CACHE_STOPPED; + assert key != null || type == EVT_CACHE_STARTED || type == EVT_CACHE_STOPPED + || type == EVT_CACHE_PARTITION_NOT_FULLY_LOADED; if (!cctx.events().isRecordable(type)) LT.warn(log, null, "Added event without checking if event is recordable: " + U.gridEventName(type)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index c33ffcb..5bb4e11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -33,6 +34,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; /** @@ -440,6 +442,10 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { changed = true; + cctx.events().addEvent(p, null, cctx.localNodeId(), (IgniteUuid)null, + null, EVT_CACHE_PARTITION_NOT_FULLY_LOADED, null, false, null, + false, null, null, null); + if (log.isDebugEnabled()) log.debug("Owned partition: " + locPart); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 98128c8..4d5b83f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -1041,6 +1041,10 @@ public class GridDhtPartitionDemandPool<K, V> { if (picked.isEmpty()) { top.own(part); + cctx.events().addEvent(p, null, cctx.localNodeId(), (IgniteUuid)null, + null, EVT_CACHE_PARTITION_NOT_FULLY_LOADED, null, false, null, + false, null, null, null); + if (log.isDebugEnabled()) log.debug("Owning partition as there are no other owners: " + part); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a51544e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java new file mode 100644 index 0000000..21916f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.junits.common.*; +import org.eclipse.jetty.util.*; + +import java.util.*; + +/** + * + */ +public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstractTest { + /** */ + private int backupCnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.matches(".*\\d")) { + String idStr = UUID.randomUUID().toString(); + + char[] chars = idStr.toCharArray(); + + chars[chars.length - 3] = '0'; + chars[chars.length - 2] = '0'; + chars[chars.length - 1] = gridName.charAt(gridName.length() - 1); + + cfg.setNodeId(UUID.fromString(new String(chars))); + } + + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(backupCnt); + + cfg.setCacheConfiguration(cacheCfg); + + cfg.setCommunicationSpi(new FilteredCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * + */ + public void testPrimaryAndBackupDead() throws Exception { + backupCnt = 1; + + startGrid(0); + startGrid(1); + startGrid(2); + + PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + + ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_PARTITION_NOT_FULLY_LOADED); + + Affinity<Integer> aff = ignite(0).affinity(null); + + int key = 0; + + while (!aff.isPrimary(ignite(0).cluster().localNode(), key) + || !aff.isBackup(ignite(1).cluster().localNode(), key)) + key++; + + IgniteCache<Integer, Integer> cache = jcache(2); + + cache.put(key, key); + + assert jcache(0).containsKey(key); + assert jcache(1).containsKey(key); + +// ((FilteredCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).stop(true); +// ((FilteredCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).stop(true); + + stopGrid(0, false); + stopGrid(1, false); + + //startGrid(4); + + awaitPartitionMapExchange(); + + assert !cache.containsKey(key); + + assert !lsnr.lostParts.isEmpty(); + } + + /** + * + */ + public void testPrimaryDead() throws Exception { + startGrid(0); + startGrid(1); + + PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); + + ignite(1).events().localListen(lsnr, EventType.EVT_CACHE_PARTITION_NOT_FULLY_LOADED); + + int key = primaryKey(jcache(0)); + + jcache(1).put(key, key); + + assert jcache(0).containsKey(key); + +// ((FilteredCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).stop(true); +// ((FilteredCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).stop(true); + + stopGrid(0, true); + + awaitPartitionMapExchange(); + + // startGrid(4); + + awaitPartitionMapExchange(); + + assert !jcache(1).containsKey(key); + + assert !lsnr.lostParts.isEmpty(); + } + + /** + * + */ + private static class PartitionNotFullyLoadedListener implements IgnitePredicate<Event> { + /** */ + private Collection<Integer> lostParts = new ConcurrentHashSet<>(); + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + lostParts.add(((CacheEvent)evt).partition()); + + return true; + } + } + + /** + * + */ + private static class FilteredCommunicationSpi extends TcpCommunicationSpi { + /** */ + private volatile boolean stop; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (!stop) + super.sendMessage(node, msg); + } + + /** + * @param stop Filter. + */ + public void stop(boolean stop) { + this.stop = stop; + } + } +}