Repository: incubator-ignite Updated Branches: refs/heads/ignite-262 [created] 66fd52c0e
IGNITE-262 - Fix for internal continuous queries Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/66fd52c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/66fd52c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/66fd52c0 Branch: refs/heads/ignite-262 Commit: 66fd52c0e6c5a1771005f21135cdceedaa0c7861 Parents: 4db4462 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 15 16:36:04 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 15 16:36:04 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 7 ++- .../processors/cache/GridCacheMapEntry.java | 15 +++--- .../processors/cache/IgniteCacheProxy.java | 2 +- .../CacheDataStructuresManager.java | 3 +- .../continuous/CacheContinuousQueryHandler.java | 12 +++++ .../CacheContinuousQueryListener.java | 2 + .../continuous/CacheContinuousQueryManager.java | 57 +++++++++++++++++--- .../service/GridServiceProcessor.java | 6 ++- .../hadoop/jobtracker/GridHadoopJobTracker.java | 1 + 9 files changed, 81 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7b426cb..7d10869 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1938,7 +1938,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> getAsync(final K key) { A.notNull(key, "key"); - + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -1980,7 +1980,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { A.notNull(keys, "keys"); - + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -4224,10 +4224,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** - * @param delegate Cache proxy. * @return Distributed ignite cache iterator. */ - public Iterator<Cache.Entry<K, V>> igniteIterator(final IgniteCacheProxy<K, V> delegate) { + public Iterator<Cache.Entry<K, V>> igniteIterator() { CacheQueryFuture<Map.Entry<K, V>> fut = queries().createScanQuery(null) .keepAll(false) .execute(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index af8d2cf..6795830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1095,7 +1095,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1234,7 +1234,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1535,7 +1535,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2108,7 +2108,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3131,10 +3131,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (!preload && (cctx.isLocal() || cctx.isReplicated() || - cctx.affinity().primary(cctx.localNode(), key, topVer))) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null); - + if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null, + preload); cctx.dataStructures().onEntryUpdated(key, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 30a9708..4d901d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1225,7 +1225,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return ctx.cache().igniteIterator(this); + return ctx.cache().igniteIterator(); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 407da34..25abfb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -217,7 +217,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } }, new QueueHeaderPredicate(), - cctx.isLocal() || cctx.isReplicated()); + cctx.isLocal() || cctx.isReplicated(), + true); } GridCacheQueueProxy queue = queuesMap.get(hdr.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 4ad664a..82b28cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -61,6 +61,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Internal flag. */ private boolean internal; + /** Notify existing flag. */ + private boolean notifyExisting; + /** Old value required flag. */ private boolean oldValRequired; @@ -91,6 +94,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param locLsnr Local listener. * @param rmtFilter Remote filter. * @param internal Internal flag. + * @param notifyExisting Notify existing flag. * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired events flag. @@ -103,6 +107,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, boolean internal, + boolean notifyExisting, boolean oldValRequired, boolean sync, boolean ignoreExpired, @@ -116,6 +121,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.locLsnr = locLsnr; this.rmtFilter = rmtFilter; this.internal = internal; + this.notifyExisting = notifyExisting; this.oldValRequired = oldValRequired; this.sync = sync; this.ignoreExpired = ignoreExpired; @@ -254,6 +260,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return oldValRequired; } + @Override public boolean notifyExisting() { + return notifyExisting; + } + private String taskName() { return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } @@ -373,6 +383,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { out.writeObject(rmtFilter); out.writeBoolean(internal); + out.writeBoolean(notifyExisting); out.writeBoolean(oldValRequired); out.writeBoolean(sync); out.writeBoolean(ignoreExpired); @@ -393,6 +404,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject(); internal = in.readBoolean(); + notifyExisting = in.readBoolean(); oldValRequired = in.readBoolean(); sync = in.readBoolean(); ignoreExpired = in.readBoolean(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 3695bad..b779c18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -44,4 +44,6 @@ interface CacheContinuousQueryListener<K, V> { * @return Whether old value is required. */ public boolean oldValueRequired(); + + public boolean notifyExisting(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c2352c2..60b22e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -118,16 +118,22 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param newBytes New value bytes. * @param oldVal Old value. * @param oldBytes Old value bytes. + * @param preload Whether update happened during preloading. * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes, - V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException { + V oldVal, GridCacheValueBytes oldBytes, boolean preload) throws IgniteCheckedException { assert e != null; assert key != null; + boolean internal = e.isInternal(); + + if (preload && !internal) + return; + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol; - if (e.isInternal()) + if (internal) lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; else lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; @@ -146,9 +152,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K boolean initialized = false; boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); - boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { + if (preload && !lsnr.notifyExisting()) + continue; + if (!initialized) { if (lsnr.oldValueRequired()) { oldVal = cctx.unwrapTemporary(oldVal); @@ -188,7 +197,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K if (e.isInternal()) return; - ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = lsnrs; + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; if (F.isEmpty(lsnrCol)) return; @@ -240,6 +249,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K timeInterval, autoUnsubscribe, false, + false, true, false, true, @@ -250,11 +260,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param locLsnr Local listener. * @param rmtFilter Remote filter. * @param loc Local flag. + * @param notifyExisting Notify existing flag. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, - boolean loc) throws IgniteCheckedException { + boolean loc, boolean notifyExisting) throws IgniteCheckedException { return executeQuery0( locLsnr, rmtFilter, @@ -262,6 +273,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, true, + notifyExisting, true, false, true, @@ -320,6 +332,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. * @param internal Internal flag. + * @param notifyExisting Notify existing flag. * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired event flag. @@ -328,8 +341,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @throws IgniteCheckedException In case of error. */ private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, - int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean oldValRequired, - boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { + int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, + boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { cctx.checkSecurity(GridSecurityPermission.CACHE_READ); if (grp == null) @@ -376,14 +389,41 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K locLsnr, rmtFilter, internal, + notifyExisting, oldValRequired, sync, ignoreExpired, taskNameHash, skipPrimaryCheck); - return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, + UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe, grp.predicate()).get(); + + final Iterator<Cache.Entry<K, V>> it = cctx.cache().igniteIterator(); + + locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() { + @Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() { + return new Iterator<CacheEntryEvent<? extends K, ? extends V>>() { + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public CacheEntryEvent<? extends K, ? extends V> next() { + Cache.Entry<K, V> e = it.next(); + + return new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), CREATED, + new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null)); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); + + return id; } /** @@ -496,6 +536,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, false, + false, cfg.isOldValueRequired(), cfg.isSynchronous(), false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 660fd03..36500e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -126,9 +126,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); - cfgQryId = cache.context().continuousQueries().executeInternalQuery(new DeploymentListener(), null, true); + cfgQryId = cache.context().continuousQueries().executeInternalQuery( + new DeploymentListener(), null, true, true); + assignQryId = cache.context().continuousQueries().executeInternalQuery( - new AssignmentListener(), null, true); + new AssignmentListener(), null, true, true); } finally { if (ctx.deploy().enabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index 6ec4058..a7e3b8f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -190,6 +190,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } }, null, + true, true );