# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bfc78ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bfc78ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bfc78ea Branch: refs/heads/ignite-389 Commit: 6bfc78ea6a4559b4ee9059893e0a7f9d4195ad3c Parents: a3eb572 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jun 4 10:04:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jun 4 10:04:18 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 18 ++++++++-- .../datastructures/DataStructuresProcessor.java | 36 +++++++++++++++++--- 2 files changed, 48 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e3fc50f..27d2dc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -167,6 +167,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private Timer updateNtfTimer; + /** */ + @GridToStringExclude + private GridTimeoutProcessor.CancelableTask starveTask; + + /** */ + @GridToStringExclude + private GridTimeoutProcessor.CancelableTask metricsLogTask; + /** Indicate error on grid stop. */ @GridToStringExclude private boolean errOnStop; @@ -859,7 +867,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (starveCheck) { final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr); - ctx.timeout().schedule(new Runnable() { + starveTask = ctx.timeout().schedule(new Runnable() { /** Last completed task count. */ private long lastCompletedCnt; @@ -886,7 +894,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { long metricsLogFreq = cfg.getMetricsLogFrequency(); if (metricsLogFreq > 0) { - ctx.timeout().schedule(new Runnable() { + metricsLogTask = ctx.timeout().schedule(new Runnable() { private final DecimalFormat dblFmt = new DecimalFormat("#.##"); @Override public void run() { @@ -1700,6 +1708,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (updateNtfTimer != null) updateNtfTimer.cancel(); + if (starveTask != null) + starveTask.close(); + + if (metricsLogTask != null) + metricsLogTask.close(); + boolean interrupted = false; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 27f6a29..2138639 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.datastructures; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -145,7 +146,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx = atomicsCache.context(); qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), - null, + new DataStructuresEntryFilter(), dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), false); } @@ -1051,10 +1052,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * */ - private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> { + static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts) - throws CacheEntryListenerException { + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { + if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) + return evt.getValue() instanceof GridCacheCountDownLatchValue; + else { + assert evt.getEventType() == EventType.REMOVED : evt; + + return true; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStructuresEntryFilter.class, this); + } + } + + /** + * + */ + private class DataStructuresEntryListener implements + CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> { + /** {@inheritDoc} */ + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts) + throws CacheEntryListenerException + { for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) { GridCacheInternal val0 = evt.getValue();