http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
deleted file mode 100644
index 7bea056..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-/**
- * Continuous query listener.
- */
-interface GridCacheContinuousQueryListener<K, V> {
-    /**
-     * Query execution callback.
-     */
-    public void onExecution();
-
-    /**
-     * Entry update callback.
-     *
-     * @param e Entry.
-     * @param recordEvt Whether to record event.
-     */
-    public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt);
-
-    /**
-     * Listener unregistered callback.
-     */
-    public void onUnregister();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
deleted file mode 100644
index 23913e4..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.internal.processors.cache.CacheEntryEvent;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.event.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static javax.cache.event.EventType.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-
-/**
- * Continuous queries manager.
- */
-public class GridCacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
-    /** Ordered topic prefix. */
-    private String topicPrefix;
-
-    /** Listeners. */
-    private final ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrs = new 
ConcurrentHashMap8<>();
-
-    /** Listeners count. */
-    private final AtomicInteger lsnrCnt = new AtomicInteger();
-
-    /** Internal entries listeners. */
-    private final ConcurrentMap<UUID, ListenerInfo<K, V>> intLsnrs = new 
ConcurrentHashMap8<>();
-
-    /** Internal listeners count. */
-    private final AtomicInteger intLsnrCnt = new AtomicInteger();
-
-    /** Query sequence number for message topic. */
-    private final AtomicLong seq = new AtomicLong();
-
-    /** Continues queries created for cache event listeners. */
-    private final ConcurrentMap<CacheEntryListenerConfiguration, 
CacheContinuousQuery<K, V>> lsnrQrys =
-        new ConcurrentHashMap8<>();
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        // Append cache name to the topic.
-        topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + 
cctx.name());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = 
cctx.config().getCacheEntryListenerConfigurations();
-
-        if (lsnrCfgs != null) {
-            for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs)
-                registerCacheEntryListener(cfg, false);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStop0(boolean cancel) {
-        super.onKernalStop0(cancel);
-
-        for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) {
-            try {
-                deregisterCacheEntryListener(lsnrCfg);
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to remove cache entry listener: " + e);
-            }
-        }
-    }
-
-    /**
-     * @param prjPred Projection predicate.
-     * @return New continuous query.
-     */
-    public CacheContinuousQuery<K, V> createQuery(@Nullable 
IgnitePredicate<Cache.Entry<K, V>> prjPred) {
-        Object topic = TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement());
-
-        return new GridCacheContinuousQueryAdapter<>(cctx, topic, prjPred);
-    }
-
-    /**
-     * @param e Cache entry.
-     * @param key Key.
-     * @param newVal New value.
-     * @param newBytes New value bytes.
-     * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
-     * @param preload {@code True} if entry is updated during preloading.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void onEntryUpdate(GridCacheEntryEx<K, V> e,
-        K key,
-        @Nullable V newVal,
-        @Nullable GridCacheValueBytes newBytes,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes,
-        boolean preload) throws IgniteCheckedException {
-        assert e != null;
-        assert key != null;
-
-        ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol;
-
-        if (e.isInternal())
-            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
-        else
-            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
-        if (F.isEmpty(lsnrCol))
-            return;
-
-        oldVal = cctx.unwrapTemporary(oldVal);
-
-        EventType evtType = newVal == null ? REMOVED :
-            ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? 
UPDATED : CREATED));
-
-        GridCacheContinuousQueryEntry<K, V> e0 = new 
GridCacheContinuousQueryEntry<>(
-            cctx,
-            e.wrap(),
-            key,
-            newVal,
-            newBytes,
-            oldVal,
-            oldBytes,
-            evtType);
-
-        e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
-
-        boolean recordEvt = !e.isInternal() && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-        for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
-            if (preload && lsnr.entryListener())
-                continue;
-
-            lsnr.onEntryUpdate(e0, recordEvt);
-        }
-    }
-
-    /**
-     * @param e Entry.
-     * @param key Key.
-     * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
-     */
-    public void onEntryExpired(GridCacheEntryEx<K, V> e,
-        K key,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes) {
-        if (e.isInternal())
-            return;
-
-        ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs;
-
-        if (F.isEmpty(lsnrCol))
-            return;
-
-        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), 
key, -1)) {
-            GridCacheContinuousQueryEntry<K, V> e0 = new 
GridCacheContinuousQueryEntry<>(
-                cctx,
-                e.wrap(),
-                key,
-                null,
-                null,
-                oldVal,
-                oldBytes,
-                EXPIRED);
-
-            for (ListenerInfo<K, V> lsnr : lsnrCol.values()) {
-                if (!lsnr.entryListener())
-                    continue;
-
-                lsnr.onEntryUpdate(e0, false);
-            }
-        }
-    }
-
-    /**
-     * @param lsnrCfg Listener configuration.
-     * @param addToCfg If {@code true} adds listener configuration to cache 
configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, 
V> lsnrCfg, boolean addToCfg)
-        throws IgniteCheckedException {
-        GridCacheContinuousQueryAdapter<K, V> qry = null;
-
-        try {
-            A.notNull(lsnrCfg, "lsnrCfg");
-
-            Factory<CacheEntryListener<? super K, ? super V>> factory = 
lsnrCfg.getCacheEntryListenerFactory();
-
-            A.notNull(factory, "cacheEntryListenerFactory");
-
-            CacheEntryListener lsnr = factory.create();
-
-            A.notNull(lsnr, "lsnr");
-
-            IgniteCacheProxy<K, V> cache= 
cctx.kernalContext().cache().jcache(cctx.name());
-
-            EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr);
-
-            if (!(cb.create() || cb.update() || cb.remove() || cb.expire()))
-                throw new IllegalArgumentException("Listener must implement 
one of CacheEntryListener sub-interfaces.");
-
-            qry = (GridCacheContinuousQueryAdapter<K, 
V>)cctx.cache().queries().createContinuousQuery();
-
-            CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, 
qry);
-
-            if (old != null)
-                throw new IllegalArgumentException("Listener is already 
registered for configuration: " + lsnrCfg);
-
-            qry.autoUnsubscribe(true);
-
-            qry.bufferSize(1);
-
-            qry.localCallback(cb);
-
-            EntryListenerFilter<K, V> fltr = new 
EntryListenerFilter<>(cb.create(),
-                cb.update(),
-                cb.remove(),
-                cb.expire(),
-                lsnrCfg.getCacheEntryEventFilterFactory(),
-                cctx.kernalContext().grid(),
-                cctx.name());
-
-            qry.remoteFilter(fltr);
-
-            qry.execute(null, false, true, lsnrCfg.isSynchronous(), 
lsnrCfg.isOldValueRequired());
-
-            if (addToCfg)
-                cctx.config().addCacheEntryListenerConfiguration(lsnrCfg);
-        }
-        catch (IgniteCheckedException e) {
-            lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to 
execute it.
-
-            throw e;
-        }
-    }
-
-    /**
-     * @param lsnrCfg Listener configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public void deregisterCacheEntryListener(CacheEntryListenerConfiguration 
lsnrCfg) throws IgniteCheckedException {
-        A.notNull(lsnrCfg, "lsnrCfg");
-
-        CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg);
-
-        if (qry != null) {
-            cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg);
-
-            qry.close();
-        }
-    }
-
-    /**
-     * @param lsnrId Listener ID.
-     * @param lsnr Listener.
-     * @param internal Internal flag.
-     * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
-     * @return Whether listener was actually registered.
-     */
-    @SuppressWarnings("UnusedParameters")
-    boolean registerListener(UUID lsnrId,
-        GridCacheContinuousQueryListener<K, V> lsnr,
-        boolean internal,
-        boolean entryLsnr) {
-        ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr);
-
-        boolean added;
-
-        if (internal) {
-            added = intLsnrs.putIfAbsent(lsnrId, info) == null;
-
-            if (added)
-                intLsnrCnt.incrementAndGet();
-        }
-        else {
-            added = lsnrs.putIfAbsent(lsnrId, info) == null;
-
-            if (added) {
-                lsnrCnt.incrementAndGet();
-
-                lsnr.onExecution();
-            }
-        }
-
-        return added;
-    }
-
-    /**
-     * @param internal Internal flag.
-     * @param id Listener ID.
-     */
-    void unregisterListener(boolean internal, UUID id) {
-        ListenerInfo info;
-
-        if (internal) {
-            if ((info = intLsnrs.remove(id)) != null) {
-                intLsnrCnt.decrementAndGet();
-
-                info.lsnr.onUnregister();
-            }
-        }
-        else {
-            if ((info = lsnrs.remove(id)) != null) {
-                lsnrCnt.decrementAndGet();
-
-                info.lsnr.onUnregister();
-            }
-        }
-    }
-
-    /**
-     * Iterates through existing data.
-     *
-     * @param internal Internal flag.
-     * @param id Listener ID.
-     * @param keepPortable Keep portable flag.
-     */
-    @SuppressWarnings("unchecked")
-    void iterate(boolean internal, UUID id, boolean keepPortable) {
-        ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id);
-
-        assert info != null;
-
-        GridCacheProjectionImpl<K, V> oldPrj = null;
-
-        try {
-            if (keepPortable) {
-                oldPrj = cctx.projectionPerCall();
-
-                cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0());
-            }
-
-            Set<Cache.Entry<K, V>> entries;
-
-            if (cctx.isReplicated())
-                entries = internal ? cctx.cache().entrySetx() :
-                    cctx.cache().entrySet();
-            else
-                entries = internal ? cctx.cache().primaryEntrySetx() :
-                    cctx.cache().primaryEntrySet();
-
-            boolean evt = !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-            for (Cache.Entry<K, V> e : entries) {
-                GridCacheContinuousQueryEntry<K, V> qryEntry = new 
GridCacheContinuousQueryEntry<>(cctx,
-                    e,
-                    e.getKey(),
-                    e.getValue(),
-                    null,
-                    null,
-                    null,
-                    CREATED);
-
-                info.onIterate(qryEntry, evt);
-            }
-
-            info.flushPending();
-        }
-        finally {
-            if (keepPortable)
-                cctx.projectionPerCall(oldPrj);
-        }
-    }
-
-    /**
-     * Listener info.
-     */
-    private static class ListenerInfo<K, V> {
-        /** Listener. */
-        private final GridCacheContinuousQueryListener<K, V> lsnr;
-
-        /** Pending entries. */
-        private Collection<PendingEntry<K, V>> pending;
-
-        /** */
-        private final boolean entryLsnr;
-
-        /**
-         * @param lsnr Listener.
-         * @param entryLsnr {@code True} if listener created for {@link 
CacheEntryListener}.
-         */
-        private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, 
boolean entryLsnr) {
-            this.lsnr = lsnr;
-            this.entryLsnr = entryLsnr;
-
-            if (!entryLsnr)
-                pending = new LinkedList<>();
-        }
-
-        /**
-         * @param e Entry update callback.
-         * @param recordEvt Whether to record event.
-         */
-        void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt) {
-            boolean notifyLsnr = true;
-
-            synchronized (this) {
-                if (pending != null) {
-                    pending.add(new PendingEntry<>(e, recordEvt));
-
-                    notifyLsnr = false;
-                }
-            }
-
-            if (notifyLsnr)
-                lsnr.onEntryUpdate(e, recordEvt);
-        }
-
-        /**
-         * @param e Entry iteration callback.
-         * @param recordEvt Whether to record event.
-         */
-        void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean 
recordEvt) {
-            lsnr.onEntryUpdate(e, recordEvt);
-        }
-
-        /**
-         * Flushes pending entries to listener.
-         */
-        void flushPending() {
-            Collection<PendingEntry<K, V>> pending0;
-
-            synchronized (this) {
-                pending0 = pending;
-
-                pending = null;
-            }
-
-            for (PendingEntry<K, V> e : pending0)
-                lsnr.onEntryUpdate(e.entry, e.recordEvt);
-        }
-
-        /**
-         * @return {@code True} if listener created for {@link 
CacheEntryListener}.
-         */
-        boolean entryListener() {
-            return entryLsnr;
-        }
-    }
-
-    /**
-     * Pending entry.
-     */
-    private static class PendingEntry<K, V> {
-        /** Entry. */
-        private final GridCacheContinuousQueryEntry<K, V> entry;
-
-        /** Whether to record event. */
-        private final boolean recordEvt;
-
-        /**
-         * @param entry Entry.
-         * @param recordEvt Whether to record event.
-         */
-        private PendingEntry(GridCacheContinuousQueryEntry<K, V> entry, 
boolean recordEvt) {
-            this.entry = entry;
-            this.recordEvt = recordEvt;
-        }
-    }
-
-    /**
-     *
-     */
-    static class EntryListenerFilter<K1, V1> implements
-        IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private boolean create;
-
-        /** */
-        private boolean update;
-
-        /** */
-        private boolean rmv;
-
-        /** */
-        private boolean expire;
-
-        /** */
-        private Factory<CacheEntryEventFilter<? super K1, ? super V1>> 
fltrFactory;
-
-        /** */
-        private CacheEntryEventFilter fltr;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        private IgniteCache cache;
-
-        /** */
-        private String cacheName;
-
-        /**
-         *
-         */
-        public EntryListenerFilter() {
-            // No-op.
-        }
-
-        /**
-         * @param create {@code True} if listens for create events.
-         * @param update {@code True} if listens for create events.
-         * @param rmv {@code True} if listens for remove events.
-         * @param expire {@code True} if listens for expire events.
-         * @param fltrFactory Filter factory.
-         * @param ignite Ignite instance.
-         * @param cacheName Cache name.
-         */
-        EntryListenerFilter(
-            boolean create,
-            boolean update,
-            boolean rmv,
-            boolean expire,
-            Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory,
-            Ignite ignite,
-            @Nullable String cacheName) {
-            this.create = create;
-            this.update = update;
-            this.rmv = rmv;
-            this.expire = expire;
-            this.fltrFactory = fltrFactory;
-            this.ignite = ignite;
-            this.cacheName = cacheName;
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-
-            cache = ignite.jcache(cacheName);
-
-            assert cache != null : cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> 
entry) {
-            try {
-                EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
-
-                switch (evtType) {
-                    case EXPIRED:
-                        if (!expire)
-                            return false;
-
-                        break;
-
-                    case REMOVED:
-                        if (!rmv)
-                            return false;
-
-                        break;
-
-                    case CREATED:
-                        if (!create)
-                            return false;
-
-                        break;
-
-                    case UPDATED:
-                        if (!update)
-                            return false;
-
-                        break;
-
-                    default:
-                        assert false : evtType;
-                }
-
-                if (fltr == null)
-                    return true;
-
-                if (cache == null) {
-                    cache = ignite.jcache(cacheName);
-
-                    assert cache != null : cacheName;
-                }
-
-                return fltr.evaluate(new CacheEntryEvent(cache, evtType, 
entry));
-            }
-            catch (Exception e) {
-                LT.warn(ignite.log(), e, "Cache entry event filter error: " + 
e);
-
-                return true;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeBoolean(create);
-
-            out.writeBoolean(update);
-
-            out.writeBoolean(rmv);
-
-            out.writeBoolean(expire);
-
-            U.writeString(out, cacheName);
-
-            out.writeObject(fltrFactory);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            create = in.readBoolean();
-
-            update = in.readBoolean();
-
-            rmv = in.readBoolean();
-
-            expire = in.readBoolean();
-
-            cacheName = U.readString(in);
-
-            fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super 
V1>>)in.readObject();
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-        }
-    }
-
-    /**
-     *
-     */
-    private class EntryListenerCallback implements
-        IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> {
-        /** */
-        private final IgniteCacheProxy<K, V> cache;
-
-        /** */
-        private final CacheEntryCreatedListener createLsnr;
-
-        /** */
-        private final CacheEntryUpdatedListener updateLsnr;
-
-        /** */
-        private final CacheEntryRemovedListener rmvLsnr;
-
-        /** */
-        private final CacheEntryExpiredListener expireLsnr;
-
-        /**
-         * @param cache Cache to be used as event source.
-         * @param lsnr Listener.
-         */
-        EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener 
lsnr) {
-            this.cache = cache;
-
-            createLsnr = lsnr instanceof CacheEntryCreatedListener ? 
(CacheEntryCreatedListener)lsnr : null;
-            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? 
(CacheEntryUpdatedListener)lsnr : null;
-            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? 
(CacheEntryRemovedListener)lsnr : null;
-            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? 
(CacheEntryExpiredListener)lsnr : null;
-        }
-
-        /**
-         * @return {@code True} if listens for create event.
-         */
-        boolean create() {
-            return createLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for update event.
-         */
-        boolean update() {
-            return updateLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for remove event.
-         */
-        boolean remove() {
-            return rmvLsnr != null;
-        }
-
-        /**
-         * @return {@code True} if listens for expire event.
-         */
-        boolean expire() {
-            return expireLsnr != null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(UUID uuid,
-            Collection<CacheContinuousQueryEntry<K, V>> entries) {
-            for (CacheContinuousQueryEntry entry : entries) {
-                try {
-                    EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
-
-                    switch (evtType) {
-                        case EXPIRED: {
-                            assert expireLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, EXPIRED, entry);
-
-                            expireLsnr.onExpired(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case REMOVED: {
-                            assert rmvLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, REMOVED, entry);
-
-                            rmvLsnr.onRemoved(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case UPDATED: {
-                            assert updateLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, UPDATED, entry);
-
-                            updateLsnr.onUpdated(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        case CREATED: {
-                            assert createLsnr != null;
-
-                            CacheEntryEvent evt0 =
-                                new CacheEntryEvent(cache, CREATED, entry);
-
-                            createLsnr.onCreated(Collections.singleton(evt0));
-
-                            break;
-                        }
-
-                        default:
-                            assert false : evtType;
-                    }
-                }
-                catch (CacheEntryListenerException e) {
-                    LT.warn(log, e, "Cache entry listener error: " + e);
-                }
-            }
-
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
index 5d0a9c4..ffc2057 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
@@ -25,7 +26,7 @@ import java.util.*;
 /**
  * Job ID.
  */
-public class GridHadoopJobId implements Externalizable {
+public class GridHadoopJobId implements GridCacheInternal, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 8673b48..660fd03 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.service;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
@@ -27,7 +26,6 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -35,13 +33,14 @@ import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.services.*;
 import org.apache.ignite.marshaller.*;
+import org.apache.ignite.services.*;
 import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
+import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -86,11 +85,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
-    /** Deployment listener. */
-    private GridCacheContinuousQueryAdapter<Object, Object> cfgQry;
+    /** Deployment listener ID. */
+    private UUID cfgQryId;
 
-    /** Assignment listener. */
-    private GridCacheContinuousQueryAdapter<Object, Object> assignQry;
+    /** Assignment listener ID. */
+    private UUID assignQryId;
 
     /**
      * @param ctx Kernal context.
@@ -119,7 +118,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
         if (ctx.isDaemon())
             return;
 
-        cache = (GridCacheProjectionEx<Object, 
Object>)ctx.cache().utilityCache();
+        cache = ctx.cache().utilityCache();
 
         ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
 
@@ -127,17 +126,9 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            cfgQry = (GridCacheContinuousQueryAdapter<Object, 
Object>)cache.queries().createContinuousQuery();
-
-            cfgQry.localCallback(new DeploymentListener());
-
-            cfgQry.execute(ctx.grid().forLocal(), true, false, false, true);
-
-            assignQry = (GridCacheContinuousQueryAdapter<Object, 
Object>)cache.queries().createContinuousQuery();
-
-            assignQry.localCallback(new AssignmentListener());
-
-            assignQry.execute(ctx.grid().forLocal(), true, false, false, true);
+            cfgQryId = 
cache.context().continuousQueries().executeInternalQuery(new 
DeploymentListener(), null, true);
+            assignQryId = 
cache.context().continuousQueries().executeInternalQuery(
+                new AssignmentListener(), null, true);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -170,21 +161,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
 
         ctx.event().removeLocalEventListener(topLsnr);
 
-        try {
-            if (cfgQry != null)
-                cfgQry.close();
-        }
-        catch (IgniteCheckedException e) {
-            log.error("Failed to unsubscribe service configuration 
notifications.", e);
-        }
+        if (cfgQryId != null)
+            cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
 
-        try {
-            if (assignQry != null)
-                assignQry.close();
-        }
-        catch (IgniteCheckedException e) {
-            log.error("Failed to unsubscribe service assignment 
notifications.", e);
-        }
+        if (assignQryId != null)
+            
cache.context().continuousQueries().cancelInternalQuery(assignQryId);
 
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
@@ -915,18 +896,12 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
     /**
      * Service deployment listener.
      */
-    private class DeploymentListener
-        implements IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Object, Object>>> {
-        /** Serial version ID. */
-        private static final long serialVersionUID = 0L;
-
+    private class DeploymentListener implements 
CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean apply(
-            UUID nodeId,
-            final Collection<CacheContinuousQueryEntry<Object, Object>> deps) {
+        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> 
deps) {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    for (Map.Entry<Object, Object> e : deps) {
+                    for (CacheEntryEvent<?, ?> e : deps) {
                         if (!(e.getKey() instanceof GridServiceDeploymentKey))
                             continue;
 
@@ -988,8 +963,6 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
                     }
                 }
             });
-
-            return true;
         }
 
         /**
@@ -1193,18 +1166,12 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
     /**
      * Assignment listener.
      */
-    private class AssignmentListener
-        implements IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Object, Object>>> {
-        /** Serial version ID. */
-        private static final long serialVersionUID = 0L;
-
+    private class AssignmentListener implements 
CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean apply(
-            UUID nodeId,
-            final Collection<CacheContinuousQueryEntry<Object, Object>> 
assignCol) {
+        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> 
assignCol) throws CacheEntryListenerException {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    for (Map.Entry<Object, Object> e : assignCol) {
+                    for (CacheEntryEvent<?, ?> e : assignCol) {
                         if (!(e.getKey() instanceof GridServiceAssignmentsKey))
                             continue;
 
@@ -1252,8 +1219,6 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
                     }
                 }
             });
-
-            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
 
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
index 7f65eba..bd27e00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
+++ 
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties
@@ -690,8 +690,8 @@ 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponseEntry
 org.apache.ignite.internal.processors.cache.query.GridCacheQueryType
 org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery
 org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery
-org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler
-org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler$DeployableObject
+org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler
+org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$DeployableObject
 
org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask
 
org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask$JdbcDriverMetadataJob
 org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcTask

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index db326e6..87d940a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -25,11 +25,9 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.processors.datastructures.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -44,6 +42,7 @@ import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
+import javax.cache.event.*;
 import javax.cache.integration.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -55,8 +54,8 @@ import static org.apache.ignite.cache.CacheDistributionMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CachePreloadMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static 
org.apache.ignite.internal.processors.cache.query.CacheQueryType.*;
 import static org.apache.ignite.events.EventType.*;
+import static 
org.apache.ignite.internal.processors.cache.query.CacheQueryType.*;
 
 /**
  * Continuous queries tests.
@@ -174,7 +173,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, 
"waitForStopAck")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, 
"pending")).size());
 
-            GridCacheContinuousQueryManager mgr =
+            CacheContinuousQueryManager mgr =
                 
((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries();
 
             assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
@@ -202,14 +201,14 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      * @throws Exception If failed.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public void testApi() throws Exception {
-        final CacheContinuousQuery<Object, Object> q = 
grid(0).cache(null).queries().createContinuousQuery();
+    public void testIllegalArguments() throws Exception {
+        final ContinuousQuery<Object, Object> q = Query.continuous();
 
         GridTestUtils.assertThrows(
             log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.bufferSize(-1);
+                    q.setBufferSize(-1);
 
                     return null;
                 }
@@ -218,24 +217,20 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             null
         );
 
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.bufferSize(0);
+                    q.setBufferSize(0);
 
                     return null;
                 }
-            },
-            IllegalArgumentException.class,
-            null
+            }, IllegalArgumentException.class, null
         );
 
         GridTestUtils.assertThrows(
             log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.timeInterval(-1);
+                    q.setTimeInterval(-1);
 
                     return null;
                 }
@@ -243,128 +238,22 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             IllegalArgumentException.class,
             null
         );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.execute();
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        q.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-            @Override public boolean apply(UUID uuid, 
Collection<CacheContinuousQueryEntry<Object, Object>> entries) {
-                return true;
-            }
-        });
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    
q.execute(grid(0).forPredicate(F.<ClusterNode>alwaysFalse()));
-
-                    return null;
-                }
-            },
-            ClusterTopologyCheckedException.class,
-            null
-        );
-
-        q.execute();
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-                        @Override public boolean apply(UUID uuid, 
Collection<CacheContinuousQueryEntry<Object, Object>> entries) {
-                            return false;
-                        }
-                    });
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.remoteFilter(null);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.bufferSize(10);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.timeInterval(10);
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
-
-        q.close();
-
-        GridTestUtils.assertThrows(
-            log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    q.execute();
-
-                    return null;
-                }
-            },
-            IllegalStateException.class,
-            null
-        );
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAllEntries() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -379,21 +268,17 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
 
-            cache.removex(2);
+            cache.remove(2);
 
-            cache.putx(1, 10);
+            cache.put(1, 10);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -419,25 +304,22 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, vals.size());
             assertEquals(3, (int)vals.get(0));
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testEntriesByFilter() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(4);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -452,46 +334,26 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() 
{
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, 
Integer> e) {
-                return e.getKey() > 2;
+        qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends 
Integer, ? extends Integer> evt) {
+                return evt.getKey() > 2;
             }
         });
 
-        // Second query to wait for notifications about all updates.
-        CacheContinuousQuery<Integer, Integer> qry0 = 
cache.queries().createContinuousQuery();
-
-        final CountDownLatch latch0 = new CountDownLatch(8);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
+            cache.put(4, 4);
 
-        qry0.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID uuid,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
-                for (Map.Entry<Integer, Integer> ignored : entries)
-                    latch0.countDown();
+            cache.remove(2);
+            cache.remove(3);
 
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-            qry0.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
-            cache.putx(4, 4);
-
-            cache.removex(2);
-            cache.removex(3);
-
-            cache.putx(1, 10);
-            cache.putx(4, 40);
+            cache.put(1, 10);
+            cache.put(4, 40);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -510,91 +372,6 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(2, vals.size());
             assertEquals(4, (int)vals.get(0));
             assertEquals(40, (int)vals.get(1));
-
-            assert latch0.await(2, SECONDS);
-        }
-        finally {
-            qry.close();
-            qry0.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testProjection() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        // Queries for non-partitioned caches always run locally.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
-            return;
-
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
-
-        final Map<Integer, List<Integer>> map = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    synchronized (map) {
-                        List<Integer> vals = map.get(e.getKey());
-
-                        if (vals == null) {
-                            vals = new ArrayList<>();
-
-                            map.put(e.getKey(), vals);
-                        }
-
-                        vals.add(e.getValue());
-                    }
-
-                    latch.countDown();
-                }
-
-                return true;
-            }
-        });
-
-        try {
-            qry.execute(grid(0).forRemotes());
-
-            int locKey = -1;
-            int rmtKey = -1;
-
-            int key = 0;
-
-            while (true) {
-                ClusterNode n = grid(0).mapKeyToNode(null, key);
-
-                assert n != null;
-
-                if (n.equals(grid(0).localNode()))
-                    locKey = key;
-                else
-                    rmtKey = key;
-
-                key++;
-
-                if (locKey >= 0 && rmtKey >= 0)
-                    break;
-            }
-
-            cache.putx(locKey, 1);
-            cache.putx(rmtKey, 2);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(1, map.size());
-
-            List<Integer> vals = map.get(rmtKey);
-
-            assertNotNull(vals);
-            assertEquals(1, vals.size());
-            assertEquals(2, (int)vals.get(0));
-        }
-        finally {
-            qry.close();
         }
     }
 
@@ -602,20 +379,19 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testLocalNodeOnly() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        // Queries for non-partitioned caches always run locally.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
+        if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED)
             return;
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(1);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -630,14 +406,10 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute(grid(0).forLocal());
-
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.localQuery(qry)) {
             int locKey = -1;
             int rmtKey = -1;
 
@@ -659,8 +431,8 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     break;
             }
 
-            cache.putx(locKey, 1);
-            cache.putx(rmtKey, 2);
+            cache.put(locKey, 1);
+            cache.put(rmtKey, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -672,103 +444,25 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, vals.size());
             assertEquals(1, (int)vals.get(0));
         }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopByCallback() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
-
-        final Map<Integer, List<Integer>> map = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    synchronized (map) {
-                        List<Integer> vals = map.get(e.getKey());
-
-                        if (vals == null) {
-                            vals = new ArrayList<>();
-
-                            map.put(e.getKey(), vals);
-                        }
-
-                        vals.add(e.getValue());
-                    }
-
-                    latch.countDown();
-                }
-
-                return false;
-            }
-        });
-
-        // Second query to wait for notifications about all updates.
-        CacheContinuousQuery<Integer, Integer> qry0 = 
cache.queries().createContinuousQuery();
-
-        final CountDownLatch latch0 = new CountDownLatch(3);
-
-        qry0.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
-                for (Map.Entry<Integer, Integer> ignored : entries)
-                    latch0.countDown();
-
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-            qry0.execute();
-
-            cache.putx(1, 1);
-            cache.putx(2, 2);
-            cache.putx(3, 3);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(1, map.size());
-
-            List<Integer> list = F.first(map.values());
-
-            assert list != null;
-
-            assertEquals(1, list.size());
-
-            assert latch0.await(2, SECONDS);
-        }
-        finally {
-            qry.close();
-            qry0.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testBuffering() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        // Buffering make sense only for remote nodes, so test only for 
partitioned cache.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
+        if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED)
             return;
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -783,18 +477,14 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.bufferSize(5);
+        qry.setBufferSize(5);
 
-        try {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
             ClusterNode node = F.first(grid(0).forRemotes().nodes());
 
-            qry.execute(grid(0).forNode(node));
-
             Collection<Integer> keys = new HashSet<>();
 
             int key = 0;
@@ -816,12 +506,12 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             Iterator<Integer> it = keys.iterator();
 
             for (int i = 0; i < 4; i++)
-                cache.putx(it.next(), 0);
+                cache.put(it.next(), 0);
 
             assert !latch.await(2, SECONDS);
 
             for (int i = 0; i < 2; i++)
-                cache.putx(it.next(), 0);
+                cache.put(it.next(), 0);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -839,29 +529,25 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 assertEquals(0, (int)vals.get(0));
             }
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTimeInterval() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        // Buffering make sense only for remote nodes, so test only for 
partitioned cache.
-        if (cache.configuration().getCacheMode() != PARTITIONED)
+        if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != 
PARTITIONED)
             return;
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, List<Integer>> map = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     synchronized (map) {
                         List<Integer> vals = map.get(e.getKey());
 
@@ -876,15 +562,13 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.bufferSize(10);
-        qry.timeInterval(3000);
+        qry.setBufferSize(10);
+        qry.setTimeInterval(3000);
 
-        try {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
             ClusterNode node = F.first(grid(0).forRemotes().nodes());
 
             Collection<Integer> keys = new HashSet<>();
@@ -906,9 +590,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
 
             for (Integer k : keys)
-                cache.putx(k, 0);
-
-            qry.execute(grid(0).forNode(node));
+                cache.put(k, 0);
 
             assert !latch.await(2, SECONDS);
             assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS);
@@ -927,296 +609,145 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 assertEquals(0, (int)vals.get(0));
             }
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testIteration() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+    public void testInitialPredicate() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(10);
-
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
+        qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() {
+            @Override public boolean apply(Integer k, Integer v) {
+                return k >= 5;
+            }
+        }));
 
-                return true;
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                assert false;
             }
         });
 
-        try {
-            for (int i = 0; i < 10; i++)
-                cache.putx(i, i);
-
-            qry.execute();
+        for (int i = 0; i < 10; i++)
+            cache.put(i, i);
 
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(10, map.size());
+        try (QueryCursor<Cache.Entry<Integer, Integer>> cur = 
cache.query(qry)) {
+            List<Cache.Entry<Integer, Integer>> res = cur.getAll();
 
-            for (int i = 0; i < 10; i++)
-                assertEquals(i, (int)map.get(i));
-        }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testIterationAndUpdates() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(12);
-
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
+            Collections.sort(res, new Comparator<Cache.Entry<Integer, 
Integer>>() {
+                @Override public int compare(Cache.Entry<Integer, Integer> e1, 
Cache.Entry<Integer, Integer> e2) {
+                    return e1.getKey().compareTo(e2.getKey());
                 }
+            });
 
-                return true;
-            }
-        });
-
-        try {
-            for (int i = 0; i < 10; i++)
-                cache.putx(i, i);
-
-            qry.execute();
-
-            cache.putx(10, 10);
-            cache.putx(11, 11);
+            assertEquals(5, res.size());
 
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
+            int exp = 5;
 
-            assertEquals(12, map.size());
+            for (Cache.Entry<Integer, Integer> e : res) {
+                assertEquals(exp, e.getKey().intValue());
+                assertEquals(exp, e.getValue().intValue());
 
-            for (int i = 0; i < 12; i++)
-                assertEquals(i, (int)map.get(i));
-        }
-        finally {
-            qry.close();
+                exp++;
+            }
         }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testLoadCache() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+    public void testInitialPredicateAndUpdates() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(10);
-
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
-                    map.put(e.getKey(), e.getValue());
-
-                    latch.countDown();
-                }
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-                return true;
+        qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() {
+            @Override public boolean apply(Integer k, Integer v) {
+                return k >= 5;
             }
-        });
-
-        try {
-            qry.execute();
-
-            for (int i = 0; i < gridCount(); i++)
-                grid(i).cache(null).loadCache(null, 0);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + 
latch.getCount();
-
-            assertEquals(10, map.size());
-
-            for (int i = 0; i < 10; i++)
-                assertEquals(i, (int)map.get(i));
-        }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTypedProjection() throws Exception {
-        GridCache<Object, Object> cache = grid(0).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry =
-            cache.projection(Integer.class, 
Integer.class).queries().createContinuousQuery();
+        }));
 
         final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
         final CountDownLatch latch = new CountDownLatch(2);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     map.put(e.getKey(), e.getValue());
 
                     latch.countDown();
                 }
-
-                return true;
-            }
-        });
-
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() 
{
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, 
Integer> e) {
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx("a", "a");
-            cache.putx(2, 2);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(2, map.size());
-
-            assertEquals(1, (int)map.get(1));
-            assertEquals(2, (int)map.get(2));
-        }
-        finally {
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEntryFilterProjection() throws Exception {
-        CacheProjection<Integer, Integer> cache = grid(0).cache(null);
+        for (int i = 0; i < 10; i++)
+            cache.put(i, i);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.projection(
-            new P1<Cache.Entry<Integer, Integer>>() {
-                @Override public boolean apply(Cache.Entry<Integer, Integer> 
e) {
-                    Integer i = e.getValue();
+        try (QueryCursor<Cache.Entry<Integer, Integer>> cur = 
cache.query(qry)) {
+            List<Cache.Entry<Integer, Integer>> res = cur.getAll();
 
-                    return i != null && i > 10;
+            Collections.sort(res, new Comparator<Cache.Entry<Integer, 
Integer>>() {
+                @Override public int compare(Cache.Entry<Integer, Integer> e1, 
Cache.Entry<Integer, Integer> e2) {
+                    return e1.getKey().compareTo(e2.getKey());
                 }
-            }).queries().createContinuousQuery();
-
-        final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
+            });
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (CacheContinuousQueryEntry<Integer, Integer> e : entries) {
-                    info("Query entry: " + e);
+            assertEquals(5, res.size());
 
-                    map.put(e.getKey(), e.getValue());
+            int exp = 5;
 
-                    latch.countDown();
-                }
+            for (Cache.Entry<Integer, Integer> e : res) {
+                assertEquals(exp, e.getKey().intValue());
+                assertEquals(exp, e.getValue().intValue());
 
-                return true;
+                exp++;
             }
-        });
 
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() 
{
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, 
Integer> e) {
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
-
-            cache.putx(1, 1);
-            cache.putx(11, 11);
-            cache.putx(2, 2);
-            cache.putx(22, 22);
+            cache.put(10, 10);
+            cache.put(11, 11);
 
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
 
-            assertEquals("Invalid number of entries notified: " + map, 2, 
map.size());
+            assertEquals(2, map.size());
 
-            assertEquals(11, (int)map.get(11));
-            assertEquals(22, (int)map.get(22));
-        }
-        finally {
-            qry.close();
+            for (int i = 11; i < 12; i++)
+                assertEquals(i, (int)map.get(i));
         }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testKeyValueFilterProjection() throws Exception {
-        CacheProjection<Integer, Integer> cache = grid(0).cache(null);
+    public void testLoadCache() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = cache.projection(
-            new P2<Integer, Integer>() {
-                @Override public boolean apply(Integer key, Integer val) {
-                    return val > 10;
-                }
-            }).queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
+        final CountDownLatch latch = new CountDownLatch(10);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                for (Map.Entry<Integer, Integer> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
                     map.put(e.getKey(), e.getValue());
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() 
{
-            @Override public boolean apply(CacheContinuousQueryEntry<Integer, 
Integer> e) {
-                return true;
-            }
-        });
-
-        try {
-            qry.execute();
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
+            cache.loadCache(null, 0);
 
-            cache.putx(1, 1);
-            cache.putx(11, 11);
-            cache.putx(2, 2);
-            cache.putx(22, 22);
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + 
latch.getCount();
 
-            assertEquals(2, map.size());
+            assertEquals(10, map.size());
 
-            assertEquals(11, (int)map.get(11));
-            assertEquals(22, (int)map.get(22));
-        }
-        finally {
-            qry.close();
+            for (int i = 0; i < 10; i++)
+                assertEquals(i, (int)map.get(i));
         }
     }
 
@@ -1227,33 +758,28 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         if (atomicityMode() == ATOMIC)
             return;
 
-        GridCache<Object, Object> cache = grid(0).cache(null);
+        IgniteCache<Object, Object> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Object, Object> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Object, Object> qry = Query.continuous();
 
         final Map<Object, Object> map = new ConcurrentHashMap8<>();
         final CountDownLatch latch = new CountDownLatch(2);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Object, Object>>>() {
-            @Override public boolean apply(UUID nodeId,
-                Collection<CacheContinuousQueryEntry<Object, Object>> entries) 
{
-                for (Map.Entry<Object, Object> e : entries) {
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> 
evts) {
+                for (CacheEntryEvent<?, ?> e : evts) {
                     map.put(e.getKey(), e.getValue());
 
                     latch.countDown();
                 }
-
-                return true;
             }
         });
 
-        try {
-            qry.execute();
-
-            cache.putx(new GridCacheInternalKeyImpl("test"), 1);
+        try (QueryCursor<Cache.Entry<Object, Object>> ignored = 
cache.query(qry)) {
+            cache.put(new GridCacheInternalKeyImpl("test"), 1);
 
-            cache.putx(1, 1);
-            cache.putx(2, 2);
+            cache.put(1, 1);
+            cache.put(2, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -1262,42 +788,41 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(1, (int)map.get(1));
             assertEquals(2, (int)map.get(2));
         }
-        finally {
-            qry.close();
-        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNodeJoin() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-        CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-        final Collection<Map.Entry<Integer, Integer>> all = new 
ConcurrentLinkedDeque8<>();
+        final Collection<CacheEntryEvent<? extends Integer, ? extends 
Integer>> all = new ConcurrentLinkedDeque8<>();
         final CountDownLatch latch = new CountDownLatch(2);
 
-        qry.localCallback(new P2<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-            @Override public boolean apply(UUID nodeId, 
Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) {
-                assertEquals(1, entries.size());
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
+                int size = 0;
 
-                all.addAll(entries);
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt 
: evts) {
+                    all.add(evt);
 
-                latch.countDown();
+                    size++;
+                }
 
-                return true;
+                assertEquals(1, size);
+
+                latch.countDown();
             }
         });
 
-        qry.execute();
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
+            cache.put(1, 1);
 
-        cache.putx(1, 1);
-
-        try {
             startGrid("anotherGrid");
 
-            cache.putx(2, 2);
+            cache.put(2, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all;
 
@@ -1305,63 +830,6 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         }
         finally {
             stopGrid("anotherGrid");
-
-            qry.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCallbackForPreload() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
-
-        if (cache.configuration().getCacheMode() == LOCAL)
-            return;
-
-        Map<Integer, Integer> map = new HashMap<>();
-
-        final int keysCnt = 1000;
-
-        for (int i = 0; i < keysCnt; i++)
-            map.put(i, i);
-
-        cache.putAll(map);
-
-        Ignite ignite = startGrid("anotherGrid");
-
-        try {
-            cache = ignite.cache(null);
-
-            CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            final Collection<Integer> keys = new GridConcurrentHashSet<>();
-
-            qry.localCallback(new IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-                @Override public boolean apply(UUID nodeId,
-                    Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
-                    for (Map.Entry<Integer, Integer> e : entries) {
-                        keys.add(e.getKey());
-
-                        if (keys.size() >= keysCnt)
-                            latch.countDown();
-                    }
-
-                    return true;
-                }
-            });
-
-            qry.execute();
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-            assertEquals(keysCnt, keys.size());
-
-            qry.close();
-        }
-        finally {
-            stopGrid("anotherGrid");
         }
     }
 
@@ -1426,26 +894,25 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 grid(i).events().localListen(execLsnr, 
EVT_CACHE_QUERY_EXECUTED);
             }
 
-            GridCache<Integer, Integer> cache = grid(0).cache(null);
+            IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
-            try (CacheContinuousQuery<Integer, Integer> qry = 
cache.queries().createContinuousQuery()) {
-                qry.localCallback(new IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
-                    @Override public boolean apply(UUID uuid,
-                        Collection<CacheContinuousQueryEntry<Integer, 
Integer>> entries) {
-                        return true;
-                    }
-                });
+            ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
-                qry.remoteFilter(new 
IgnitePredicate<CacheContinuousQueryEntry<Integer, Integer>>() {
-                    @Override public boolean 
apply(CacheContinuousQueryEntry<Integer, Integer> e) {
-                        return e.getValue() >= 50;
-                    }
-                });
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
Integer>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends Integer, ? extends Integer>> evts) {
+                    // No-op.
+                }
+            });
 
-                qry.execute();
+            qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() {
+                @Override public boolean evaluate(CacheEntryEvent<? extends 
Integer, ? extends Integer> evt) {
+                    return evt.getValue() >= 50;
+                }
+            });
 
+            try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
                 for (int i = 0; i < 100; i++)
-                    cache.putx(i, i);
+                    cache.put(i, i);
 
                 assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
                 assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -1466,8 +933,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
      */
     private static class TestStore extends CacheStoreAdapter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo,
-            Object... args) {
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
Object... args) {
             for (int i = 0; i < 10; i++)
                 clo.apply(i, i);
         }

Reply via email to