ignite-484-1 - replicated cache group reservation fix + drop reservations group for dead nodes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1fe215e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1fe215e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1fe215e2 Branch: refs/heads/ignite-843 Commit: 1fe215e2cc83954f25cc7c2f0974dcf312694eb8 Parents: 68c35e7 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Jun 17 15:57:24 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Jun 17 15:57:24 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 22 ++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1fe215e2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 42f01cb..aaf64ee 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -123,6 +123,18 @@ public class GridMapQueryExecutor { } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + // Drop group reservations for dead caches. + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + String cacheName = ((CacheEvent)evt).cacheName(); + + for (T2<String,AffinityTopologyVersion> grpKey : reservations.keySet()) { + if (F.eq(grpKey.get1(), cacheName)) + reservations.remove(grpKey); + } + } + }, EventType.EVT_CACHE_STOPPED); + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object msg) { if (!busyLock.enterBusy()) @@ -244,7 +256,9 @@ public class GridMapQueryExecutor { if (cctx.isLocal()) continue; - final T2<String,AffinityTopologyVersion> grpKey = new T2<>(cctx.name(), topVer); + // For replicated cache topology version does not make sense. + final T2<String,AffinityTopologyVersion> grpKey = + new T2<>(cctx.name(), cctx.isReplicated() ? null : topVer); GridReservable r = reservations.get(grpKey); @@ -265,10 +279,10 @@ public class GridMapQueryExecutor { // We don't need to reserve partitions because they will not be evicted in replicated caches. if (part == null || part.state() != OWNING) return false; - - // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); } + + // Mark that we checked this replicated cache. + reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); } } else { // Reserve primary partitions for partitioned cache (if no explicit given).