IGNITE-1026 - Count down latch fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6f50ad9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6f50ad9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6f50ad9f Branch: refs/heads/ignite-648 Commit: 6f50ad9f2d7a0e049863f235104c65c86ee8c5ad Parents: 8ff3619 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Jul 2 16:05:03 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Jul 2 16:05:03 2015 -0700 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 4 +-- .../datastructures/DataStructuresProcessor.java | 31 +++++++++++++++++--- .../GridCacheCountDownLatchImpl.java | 12 ++++---- 3 files changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f50ad9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ff2905f..e059760 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -353,8 +353,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<K, V>(cache, cctx, e); - }; + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } } ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f50ad9f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 4d2ecbe..5c171e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -1056,7 +1056,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.remove(key); tx.commit(); - } else + } + else tx.setRollbackOnly(); return null; @@ -1147,19 +1148,41 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { GridCacheInternalKey key = evt.getKey(); // Notify latch on changes. - GridCacheRemovable latch = dsMap.get(key); + final GridCacheRemovable latch = dsMap.get(key); GridCacheCountDownLatchValue val = (GridCacheCountDownLatchValue)val0; if (latch instanceof GridCacheCountDownLatchEx) { - GridCacheCountDownLatchEx latch0 = (GridCacheCountDownLatchEx)latch; + final GridCacheCountDownLatchEx latch0 = (GridCacheCountDownLatchEx)latch; latch0.onUpdate(val.get()); if (val.get() == 0 && val.autoDelete()) { dsMap.remove(key); - latch.onRemoved(); + IgniteInternalFuture<?> removeFut = ctx.closure().runLocalSafe(new GPR() { + @Override public void run() { + try { + removeCountDownLatch(latch0.name()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to remove count down latch: " + latch0.name(), e); + } + } + }); + + removeFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + f.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to remove count down latch: " + latch0.name(), e); + } + + latch.onRemoved(); + } + }); } } else if (latch != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f50ad9f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index a5353d8..2df6015 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -288,11 +288,13 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public void close() { - try { - ctx.kernalContext().dataStructures().removeCountDownLatch(name); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + if (!rmvd) { + try { + ctx.kernalContext().dataStructures().removeCountDownLatch(name); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } }