Repository: incubator-ignite
Updated Branches:
  refs/heads/sprint-1 f0a7a7390 -> 2d20498d2


IGNITE-262 - Fix for internal continuous queries


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/66fd52c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/66fd52c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/66fd52c0

Branch: refs/heads/sprint-1
Commit: 66fd52c0e6c5a1771005f21135cdceedaa0c7861
Parents: 4db4462
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 15 16:36:04 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 15 16:36:04 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  7 ++-
 .../processors/cache/GridCacheMapEntry.java     | 15 +++---
 .../processors/cache/IgniteCacheProxy.java      |  2 +-
 .../CacheDataStructuresManager.java             |  3 +-
 .../continuous/CacheContinuousQueryHandler.java | 12 +++++
 .../CacheContinuousQueryListener.java           |  2 +
 .../continuous/CacheContinuousQueryManager.java | 57 +++++++++++++++++---
 .../service/GridServiceProcessor.java           |  6 ++-
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  1 +
 9 files changed, 81 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7b426cb..7d10869 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1938,7 +1938,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<V> getAsync(final K key) {
         A.notNull(key, "key");
-        
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -1980,7 +1980,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable 
final Collection<? extends K> keys) {
         A.notNull(keys, "keys");
-        
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -4224,10 +4224,9 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
     }
 
     /**
-     * @param delegate Cache proxy.
      * @return Distributed ignite cache iterator.
      */
-    public Iterator<Cache.Entry<K, V>> igniteIterator(final 
IgniteCacheProxy<K, V> delegate) {
+    public Iterator<Cache.Entry<K, V>> igniteIterator() {
         CacheQueryFuture<Map.Entry<K, V>> fut = queries().createScanQuery(null)
             .keepAll(false)
             .execute();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index af8d2cf..6795830 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1095,7 +1095,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
 
             if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1234,7 +1234,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, true);
         }
@@ -1535,7 +1535,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -2108,7 +2108,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -3131,10 +3131,9 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (!preload && (cctx.isLocal() || cctx.isReplicated() ||
-                        cctx.affinity().primary(cctx.localNode(), key, 
topVer)))
-                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null);
-
+                    if (cctx.isLocal() || cctx.isReplicated() || 
cctx.affinity().primary(cctx.localNode(), key, topVer))
+                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null,
+                            preload);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 30a9708..4d901d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1225,7 +1225,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return ctx.cache().igniteIterator(this);
+            return ctx.cache().igniteIterator();
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 407da34..25abfb0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -217,7 +217,8 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
                         }
                     },
                     new QueueHeaderPredicate(),
-                    cctx.isLocal() || cctx.isReplicated());
+                    cctx.isLocal() || cctx.isReplicated(),
+                    true);
             }
 
             GridCacheQueueProxy queue = queuesMap.get(hdr.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4ad664a..82b28cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -61,6 +61,9 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     /** Internal flag. */
     private boolean internal;
 
+    /** Notify existing flag. */
+    private boolean notifyExisting;
+
     /** Old value required flag. */
     private boolean oldValRequired;
 
@@ -91,6 +94,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param internal Internal flag.
+     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
@@ -103,6 +107,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         CacheEntryUpdatedListener<K, V> locLsnr,
         CacheEntryEventFilter<K, V> rmtFilter,
         boolean internal,
+        boolean notifyExisting,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
@@ -116,6 +121,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         this.locLsnr = locLsnr;
         this.rmtFilter = rmtFilter;
         this.internal = internal;
+        this.notifyExisting = notifyExisting;
         this.oldValRequired = oldValRequired;
         this.sync = sync;
         this.ignoreExpired = ignoreExpired;
@@ -254,6 +260,10 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                 return oldValRequired;
             }
 
+            @Override public boolean notifyExisting() {
+                return notifyExisting;
+            }
+
             private String taskName() {
                 return ctx.security().enabled() ? 
ctx.task().resolveTaskName(taskHash) : null;
             }
@@ -373,6 +383,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
             out.writeObject(rmtFilter);
 
         out.writeBoolean(internal);
+        out.writeBoolean(notifyExisting);
         out.writeBoolean(oldValRequired);
         out.writeBoolean(sync);
         out.writeBoolean(ignoreExpired);
@@ -393,6 +404,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
             rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject();
 
         internal = in.readBoolean();
+        notifyExisting = in.readBoolean();
         oldValRequired = in.readBoolean();
         sync = in.readBoolean();
         ignoreExpired = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 3695bad..b779c18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -44,4 +44,6 @@ interface CacheContinuousQueryListener<K, V> {
      * @return Whether old value is required.
      */
     public boolean oldValueRequired();
+
+    public boolean notifyExisting();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c2352c2..60b22e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -118,16 +118,22 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param newBytes New value bytes.
      * @param oldVal Old value.
      * @param oldBytes Old value bytes.
+     * @param preload Whether update happened during preloading.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, 
GridCacheValueBytes newBytes,
-        V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException {
+        V oldVal, GridCacheValueBytes oldBytes, boolean preload) throws 
IgniteCheckedException {
         assert e != null;
         assert key != null;
 
+        boolean internal = e.isInternal();
+
+        if (preload && !internal)
+            return;
+
         ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol;
 
-        if (e.isInternal())
+        if (internal)
             lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
         else
             lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
@@ -146,9 +152,12 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         boolean initialized = false;
 
         boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
-        boolean recordIgniteEvt = !e.isInternal() && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean recordIgniteEvt = !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
+            if (preload && !lsnr.notifyExisting())
+                continue;
+
             if (!initialized) {
                 if (lsnr.oldValueRequired()) {
                     oldVal = cctx.unwrapTemporary(oldVal);
@@ -188,7 +197,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         if (e.isInternal())
             return;
 
-        ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = 
lsnrs;
+        ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = 
lsnrCnt.get() > 0 ? lsnrs : null;
 
         if (F.isEmpty(lsnrCol))
             return;
@@ -240,6 +249,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             timeInterval,
             autoUnsubscribe,
             false,
+            false,
             true,
             false,
             true,
@@ -250,11 +260,12 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param loc Local flag.
+     * @param notifyExisting Notify existing flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
     public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
-        boolean loc) throws IgniteCheckedException {
+        boolean loc, boolean notifyExisting) throws IgniteCheckedException {
         return executeQuery0(
             locLsnr,
             rmtFilter,
@@ -262,6 +273,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
             true,
+            notifyExisting,
             true,
             false,
             true,
@@ -320,6 +332,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
      * @param internal Internal flag.
+     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired event flag.
@@ -328,8 +341,8 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @throws IgniteCheckedException In case of error.
      */
     private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
-        int bufSize, long timeInterval, boolean autoUnsubscribe, boolean 
internal, boolean oldValRequired,
-        boolean sync, boolean ignoreExpired, ClusterGroup grp) throws 
IgniteCheckedException {
+        int bufSize, long timeInterval, boolean autoUnsubscribe, boolean 
internal, boolean notifyExisting,
+        boolean oldValRequired, boolean sync, boolean ignoreExpired, 
ClusterGroup grp) throws IgniteCheckedException {
         cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
         if (grp == null)
@@ -376,14 +389,41 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             locLsnr,
             rmtFilter,
             internal,
+            notifyExisting,
             oldValRequired,
             sync,
             ignoreExpired,
             taskNameHash,
             skipPrimaryCheck);
 
-        return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, 
timeInterval,
+        UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, 
timeInterval,
             autoUnsubscribe, grp.predicate()).get();
+
+        final Iterator<Cache.Entry<K, V>> it = cctx.cache().igniteIterator();
+
+        locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends 
V>>() {
+            @Override public Iterator<CacheEntryEvent<? extends K, ? extends 
V>> iterator() {
+                return new Iterator<CacheEntryEvent<? extends K, ? extends 
V>>() {
+                    @Override public boolean hasNext() {
+                        return it.hasNext();
+                    }
+
+                    @Override public CacheEntryEvent<? extends K, ? extends V> 
next() {
+                        Cache.Entry<K, V> e = it.next();
+
+                        return new CacheContinuousQueryEvent<>(
+                            cctx.kernalContext().cache().jcache(cctx.name()), 
CREATED,
+                            new CacheContinuousQueryEntry<>(e.getKey(), 
e.getValue(), null, null, null));
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        });
+
+        return id;
     }
 
     /**
@@ -496,6 +536,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
                 false,
+                false,
                 cfg.isOldValueRequired(),
                 cfg.isSynchronous(),
                 false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 660fd03..36500e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -126,9 +126,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            cfgQryId = 
cache.context().continuousQueries().executeInternalQuery(new 
DeploymentListener(), null, true);
+            cfgQryId = 
cache.context().continuousQueries().executeInternalQuery(
+                new DeploymentListener(), null, true, true);
+
             assignQryId = 
cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, true);
+                new AssignmentListener(), null, true, true);
         }
         finally {
             if (ctx.deploy().enabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/66fd52c0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 6ec4058..a7e3b8f 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -190,6 +190,7 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
                 }
             },
             null,
+            true,
             true
         );
 

Reply via email to