http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java deleted file mode 100644 index 7bea056..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -/** - * Continuous query listener. - */ -interface GridCacheContinuousQueryListener<K, V> { - /** - * Query execution callback. - */ - public void onExecution(); - - /** - * Entry update callback. - * - * @param e Entry. - * @param recordEvt Whether to record event. - */ - public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt); - - /** - * Listener unregistered callback. - */ - public void onUnregister(); -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java deleted file mode 100644 index 23913e4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.processors.cache.CacheEntryEvent; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import javax.cache.event.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static javax.cache.event.EventType.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.GridTopic.*; - -/** - * Continuous queries manager. - */ -public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { - /** Ordered topic prefix. */ - private String topicPrefix; - - /** Listeners. */ - private final ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrs = new ConcurrentHashMap8<>(); - - /** Listeners count. */ - private final AtomicInteger lsnrCnt = new AtomicInteger(); - - /** Internal entries listeners. */ - private final ConcurrentMap<UUID, ListenerInfo<K, V>> intLsnrs = new ConcurrentHashMap8<>(); - - /** Internal listeners count. */ - private final AtomicInteger intLsnrCnt = new AtomicInteger(); - - /** Query sequence number for message topic. */ - private final AtomicLong seq = new AtomicLong(); - - /** Continues queries created for cache event listeners. */ - private final ConcurrentMap<CacheEntryListenerConfiguration, CacheContinuousQuery<K, V>> lsnrQrys = - new ConcurrentHashMap8<>(); - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - // Append cache name to the topic. - topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void onKernalStart0() throws IgniteCheckedException { - Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); - - if (lsnrCfgs != null) { - for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs) - registerCacheEntryListener(cfg, false); - } - } - - /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - super.onKernalStop0(cancel); - - for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) { - try { - deregisterCacheEntryListener(lsnrCfg); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to remove cache entry listener: " + e); - } - } - } - - /** - * @param prjPred Projection predicate. - * @return New continuous query. - */ - public CacheContinuousQuery<K, V> createQuery(@Nullable IgnitePredicate<Cache.Entry<K, V>> prjPred) { - Object topic = TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()); - - return new GridCacheContinuousQueryAdapter<>(cctx, topic, prjPred); - } - - /** - * @param e Cache entry. - * @param key Key. - * @param newVal New value. - * @param newBytes New value bytes. - * @param oldVal Old value. - * @param oldBytes Old value bytes. - * @param preload {@code True} if entry is updated during preloading. - * @throws IgniteCheckedException In case of error. - */ - public void onEntryUpdate(GridCacheEntryEx<K, V> e, - K key, - @Nullable V newVal, - @Nullable GridCacheValueBytes newBytes, - V oldVal, - @Nullable GridCacheValueBytes oldBytes, - boolean preload) throws IgniteCheckedException { - assert e != null; - assert key != null; - - ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol; - - if (e.isInternal()) - lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; - else - lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; - - if (F.isEmpty(lsnrCol)) - return; - - oldVal = cctx.unwrapTemporary(oldVal); - - EventType evtType = newVal == null ? REMOVED : - ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED)); - - GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, - e.wrap(), - key, - newVal, - newBytes, - oldVal, - oldBytes, - evtType); - - e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); - - boolean recordEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { - if (preload && lsnr.entryListener()) - continue; - - lsnr.onEntryUpdate(e0, recordEvt); - } - } - - /** - * @param e Entry. - * @param key Key. - * @param oldVal Old value. - * @param oldBytes Old value bytes. - */ - public void onEntryExpired(GridCacheEntryEx<K, V> e, - K key, - V oldVal, - @Nullable GridCacheValueBytes oldBytes) { - if (e.isInternal()) - return; - - ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs; - - if (F.isEmpty(lsnrCol)) - return; - - if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { - GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, - e.wrap(), - key, - null, - null, - oldVal, - oldBytes, - EXPIRED); - - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { - if (!lsnr.entryListener()) - continue; - - lsnr.onEntryUpdate(e0, false); - } - } - } - - /** - * @param lsnrCfg Listener configuration. - * @param addToCfg If {@code true} adds listener configuration to cache configuration. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg, boolean addToCfg) - throws IgniteCheckedException { - GridCacheContinuousQueryAdapter<K, V> qry = null; - - try { - A.notNull(lsnrCfg, "lsnrCfg"); - - Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); - - A.notNull(factory, "cacheEntryListenerFactory"); - - CacheEntryListener lsnr = factory.create(); - - A.notNull(lsnr, "lsnr"); - - IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name()); - - EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr); - - if (!(cb.create() || cb.update() || cb.remove() || cb.expire())) - throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces."); - - qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery(); - - CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry); - - if (old != null) - throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg); - - qry.autoUnsubscribe(true); - - qry.bufferSize(1); - - qry.localCallback(cb); - - EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(), - cb.update(), - cb.remove(), - cb.expire(), - lsnrCfg.getCacheEntryEventFilterFactory(), - cctx.kernalContext().grid(), - cctx.name()); - - qry.remoteFilter(fltr); - - qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired()); - - if (addToCfg) - cctx.config().addCacheEntryListenerConfiguration(lsnrCfg); - } - catch (IgniteCheckedException e) { - lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. - - throw e; - } - } - - /** - * @param lsnrCfg Listener configuration. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException { - A.notNull(lsnrCfg, "lsnrCfg"); - - CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg); - - if (qry != null) { - cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg); - - qry.close(); - } - } - - /** - * @param lsnrId Listener ID. - * @param lsnr Listener. - * @param internal Internal flag. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @return Whether listener was actually registered. - */ - @SuppressWarnings("UnusedParameters") - boolean registerListener(UUID lsnrId, - GridCacheContinuousQueryListener<K, V> lsnr, - boolean internal, - boolean entryLsnr) { - ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr); - - boolean added; - - if (internal) { - added = intLsnrs.putIfAbsent(lsnrId, info) == null; - - if (added) - intLsnrCnt.incrementAndGet(); - } - else { - added = lsnrs.putIfAbsent(lsnrId, info) == null; - - if (added) { - lsnrCnt.incrementAndGet(); - - lsnr.onExecution(); - } - } - - return added; - } - - /** - * @param internal Internal flag. - * @param id Listener ID. - */ - void unregisterListener(boolean internal, UUID id) { - ListenerInfo info; - - if (internal) { - if ((info = intLsnrs.remove(id)) != null) { - intLsnrCnt.decrementAndGet(); - - info.lsnr.onUnregister(); - } - } - else { - if ((info = lsnrs.remove(id)) != null) { - lsnrCnt.decrementAndGet(); - - info.lsnr.onUnregister(); - } - } - } - - /** - * Iterates through existing data. - * - * @param internal Internal flag. - * @param id Listener ID. - * @param keepPortable Keep portable flag. - */ - @SuppressWarnings("unchecked") - void iterate(boolean internal, UUID id, boolean keepPortable) { - ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id); - - assert info != null; - - GridCacheProjectionImpl<K, V> oldPrj = null; - - try { - if (keepPortable) { - oldPrj = cctx.projectionPerCall(); - - cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0()); - } - - Set<Cache.Entry<K, V>> entries; - - if (cctx.isReplicated()) - entries = internal ? cctx.cache().entrySetx() : - cctx.cache().entrySet(); - else - entries = internal ? cctx.cache().primaryEntrySetx() : - cctx.cache().primaryEntrySet(); - - boolean evt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - for (Cache.Entry<K, V> e : entries) { - GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, - e, - e.getKey(), - e.getValue(), - null, - null, - null, - CREATED); - - info.onIterate(qryEntry, evt); - } - - info.flushPending(); - } - finally { - if (keepPortable) - cctx.projectionPerCall(oldPrj); - } - } - - /** - * Listener info. - */ - private static class ListenerInfo<K, V> { - /** Listener. */ - private final GridCacheContinuousQueryListener<K, V> lsnr; - - /** Pending entries. */ - private Collection<PendingEntry<K, V>> pending; - - /** */ - private final boolean entryLsnr; - - /** - * @param lsnr Listener. - * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}. - */ - private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) { - this.lsnr = lsnr; - this.entryLsnr = entryLsnr; - - if (!entryLsnr) - pending = new LinkedList<>(); - } - - /** - * @param e Entry update callback. - * @param recordEvt Whether to record event. - */ - void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - boolean notifyLsnr = true; - - synchronized (this) { - if (pending != null) { - pending.add(new PendingEntry<>(e, recordEvt)); - - notifyLsnr = false; - } - } - - if (notifyLsnr) - lsnr.onEntryUpdate(e, recordEvt); - } - - /** - * @param e Entry iteration callback. - * @param recordEvt Whether to record event. - */ - void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - lsnr.onEntryUpdate(e, recordEvt); - } - - /** - * Flushes pending entries to listener. - */ - void flushPending() { - Collection<PendingEntry<K, V>> pending0; - - synchronized (this) { - pending0 = pending; - - pending = null; - } - - for (PendingEntry<K, V> e : pending0) - lsnr.onEntryUpdate(e.entry, e.recordEvt); - } - - /** - * @return {@code True} if listener created for {@link CacheEntryListener}. - */ - boolean entryListener() { - return entryLsnr; - } - } - - /** - * Pending entry. - */ - private static class PendingEntry<K, V> { - /** Entry. */ - private final GridCacheContinuousQueryEntry<K, V> entry; - - /** Whether to record event. */ - private final boolean recordEvt; - - /** - * @param entry Entry. - * @param recordEvt Whether to record event. - */ - private PendingEntry(GridCacheContinuousQueryEntry<K, V> entry, boolean recordEvt) { - this.entry = entry; - this.recordEvt = recordEvt; - } - } - - /** - * - */ - static class EntryListenerFilter<K1, V1> implements - IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean create; - - /** */ - private boolean update; - - /** */ - private boolean rmv; - - /** */ - private boolean expire; - - /** */ - private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory; - - /** */ - private CacheEntryEventFilter fltr; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteCache cache; - - /** */ - private String cacheName; - - /** - * - */ - public EntryListenerFilter() { - // No-op. - } - - /** - * @param create {@code True} if listens for create events. - * @param update {@code True} if listens for create events. - * @param rmv {@code True} if listens for remove events. - * @param expire {@code True} if listens for expire events. - * @param fltrFactory Filter factory. - * @param ignite Ignite instance. - * @param cacheName Cache name. - */ - EntryListenerFilter( - boolean create, - boolean update, - boolean rmv, - boolean expire, - Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory, - Ignite ignite, - @Nullable String cacheName) { - this.create = create; - this.update = update; - this.rmv = rmv; - this.expire = expire; - this.fltrFactory = fltrFactory; - this.ignite = ignite; - this.cacheName = cacheName; - - if (fltrFactory != null) - fltr = fltrFactory.create(); - - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> entry) { - try { - EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); - - switch (evtType) { - case EXPIRED: - if (!expire) - return false; - - break; - - case REMOVED: - if (!rmv) - return false; - - break; - - case CREATED: - if (!create) - return false; - - break; - - case UPDATED: - if (!update) - return false; - - break; - - default: - assert false : evtType; - } - - if (fltr == null) - return true; - - if (cache == null) { - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - return fltr.evaluate(new CacheEntryEvent(cache, evtType, entry)); - } - catch (Exception e) { - LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); - - return true; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(create); - - out.writeBoolean(update); - - out.writeBoolean(rmv); - - out.writeBoolean(expire); - - U.writeString(out, cacheName); - - out.writeObject(fltrFactory); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - create = in.readBoolean(); - - update = in.readBoolean(); - - rmv = in.readBoolean(); - - expire = in.readBoolean(); - - cacheName = U.readString(in); - - fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject(); - - if (fltrFactory != null) - fltr = fltrFactory.create(); - } - } - - /** - * - */ - private class EntryListenerCallback implements - IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> { - /** */ - private final IgniteCacheProxy<K, V> cache; - - /** */ - private final CacheEntryCreatedListener createLsnr; - - /** */ - private final CacheEntryUpdatedListener updateLsnr; - - /** */ - private final CacheEntryRemovedListener rmvLsnr; - - /** */ - private final CacheEntryExpiredListener expireLsnr; - - /** - * @param cache Cache to be used as event source. - * @param lsnr Listener. - */ - EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) { - this.cache = cache; - - createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; - updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; - rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; - expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; - } - - /** - * @return {@code True} if listens for create event. - */ - boolean create() { - return createLsnr != null; - } - - /** - * @return {@code True} if listens for update event. - */ - boolean update() { - return updateLsnr != null; - } - - /** - * @return {@code True} if listens for remove event. - */ - boolean remove() { - return rmvLsnr != null; - } - - /** - * @return {@code True} if listens for expire event. - */ - boolean expire() { - return expireLsnr != null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<K, V>> entries) { - for (CacheContinuousQueryEntry entry : entries) { - try { - EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); - - switch (evtType) { - case EXPIRED: { - assert expireLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, EXPIRED, entry); - - expireLsnr.onExpired(Collections.singleton(evt0)); - - break; - } - - case REMOVED: { - assert rmvLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, REMOVED, entry); - - rmvLsnr.onRemoved(Collections.singleton(evt0)); - - break; - } - - case UPDATED: { - assert updateLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, UPDATED, entry); - - updateLsnr.onUpdated(Collections.singleton(evt0)); - - break; - } - - case CREATED: { - assert createLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, CREATED, entry); - - createLsnr.onCreated(Collections.singleton(evt0)); - - break; - } - - default: - assert false : evtType; - } - } - catch (CacheEntryListenerException e) { - LT.warn(log, e, "Cache entry listener error: " + e); - } - } - - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java index 5d0a9c4..ffc2057 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; @@ -25,7 +26,7 @@ import java.util.*; /** * Job ID. */ -public class GridHadoopJobId implements Externalizable { +public class GridHadoopJobId implements GridCacheInternal, Externalizable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 8673b48..660fd03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.service; import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; @@ -27,7 +26,6 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -35,13 +33,14 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.services.*; import org.apache.ignite.marshaller.*; +import org.apache.ignite.services.*; import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.*; +import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; @@ -86,11 +85,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Topology listener. */ private GridLocalEventListener topLsnr = new TopologyListener(); - /** Deployment listener. */ - private GridCacheContinuousQueryAdapter<Object, Object> cfgQry; + /** Deployment listener ID. */ + private UUID cfgQryId; - /** Assignment listener. */ - private GridCacheContinuousQueryAdapter<Object, Object> assignQry; + /** Assignment listener ID. */ + private UUID assignQryId; /** * @param ctx Kernal context. @@ -119,7 +118,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.isDaemon()) return; - cache = (GridCacheProjectionEx<Object, Object>)ctx.cache().utilityCache(); + cache = ctx.cache().utilityCache(); ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); @@ -127,17 +126,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); - cfgQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); - - cfgQry.localCallback(new DeploymentListener()); - - cfgQry.execute(ctx.grid().forLocal(), true, false, false, true); - - assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); - - assignQry.localCallback(new AssignmentListener()); - - assignQry.execute(ctx.grid().forLocal(), true, false, false, true); + cfgQryId = cache.context().continuousQueries().executeInternalQuery(new DeploymentListener(), null, true); + assignQryId = cache.context().continuousQueries().executeInternalQuery( + new AssignmentListener(), null, true); } finally { if (ctx.deploy().enabled()) @@ -170,21 +161,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.event().removeLocalEventListener(topLsnr); - try { - if (cfgQry != null) - cfgQry.close(); - } - catch (IgniteCheckedException e) { - log.error("Failed to unsubscribe service configuration notifications.", e); - } + if (cfgQryId != null) + cache.context().continuousQueries().cancelInternalQuery(cfgQryId); - try { - if (assignQry != null) - assignQry.close(); - } - catch (IgniteCheckedException e) { - log.error("Failed to unsubscribe service assignment notifications.", e); - } + if (assignQryId != null) + cache.context().continuousQueries().cancelInternalQuery(assignQryId); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@ -915,18 +896,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Service deployment listener. */ - private class DeploymentListener - implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> { - /** Serial version ID. */ - private static final long serialVersionUID = 0L; - + private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public boolean apply( - UUID nodeId, - final Collection<CacheContinuousQueryEntry<Object, Object>> deps) { + @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) { depExe.submit(new BusyRunnable() { @Override public void run0() { - for (Map.Entry<Object, Object> e : deps) { + for (CacheEntryEvent<?, ?> e : deps) { if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -988,8 +963,6 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } }); - - return true; } /** @@ -1193,18 +1166,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Assignment listener. */ - private class AssignmentListener - implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> { - /** Serial version ID. */ - private static final long serialVersionUID = 0L; - + private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public boolean apply( - UUID nodeId, - final Collection<CacheContinuousQueryEntry<Object, Object>> assignCol) { + @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException { depExe.submit(new BusyRunnable() { @Override public void run0() { - for (Map.Entry<Object, Object> e : assignCol) { + for (CacheEntryEvent<?, ?> e : assignCol) { if (!(e.getKey() instanceof GridServiceAssignmentsKey)) continue; @@ -1252,8 +1219,6 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } }); - - return true; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties index 7f65eba..bd27e00 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties @@ -690,8 +690,8 @@ org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponseEntry org.apache.ignite.internal.processors.cache.query.GridCacheQueryType org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery -org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler -org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler$DeployableObject +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$DeployableObject org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask$JdbcDriverMetadataJob org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcTask http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index db326e6..87d940a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -25,11 +25,9 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -44,6 +42,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; +import javax.cache.event.*; import javax.cache.integration.*; import java.util.*; import java.util.concurrent.*; @@ -55,8 +54,8 @@ import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; /** * Continuous queries tests. @@ -174,7 +173,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); - GridCacheContinuousQueryManager mgr = + CacheContinuousQueryManager mgr = ((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries(); assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size()); @@ -202,14 +201,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo * @throws Exception If failed. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testApi() throws Exception { - final CacheContinuousQuery<Object, Object> q = grid(0).cache(null).queries().createContinuousQuery(); + public void testIllegalArguments() throws Exception { + final ContinuousQuery<Object, Object> q = Query.continuous(); GridTestUtils.assertThrows( log, new Callable<Object>() { @Override public Object call() throws Exception { - q.bufferSize(-1); + q.setBufferSize(-1); return null; } @@ -218,24 +217,20 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo null ); - GridTestUtils.assertThrows( - log, - new Callable<Object>() { + GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - q.bufferSize(0); + q.setBufferSize(0); return null; } - }, - IllegalArgumentException.class, - null + }, IllegalArgumentException.class, null ); GridTestUtils.assertThrows( log, new Callable<Object>() { @Override public Object call() throws Exception { - q.timeInterval(-1); + q.setTimeInterval(-1); return null; } @@ -243,128 +238,22 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo IllegalArgumentException.class, null ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(); - - return null; - } - }, - IllegalStateException.class, - null - ); - - q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - return true; - } - }); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())); - - return null; - } - }, - ClusterTopologyCheckedException.class, - null - ); - - q.execute(); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - return false; - } - }); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.remoteFilter(null); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.bufferSize(10); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.timeInterval(10); - - return null; - } - }, - IllegalStateException.class, - null - ); - - q.close(); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(); - - return null; - } - }, - IllegalStateException.class, - null - ); } /** * @throws Exception If failed. */ public void testAllEntries() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -379,21 +268,17 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); - cache.removex(2); + cache.remove(2); - cache.putx(1, 10); + cache.put(1, 10); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -419,25 +304,22 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, vals.size()); assertEquals(3, (int)vals.get(0)); } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testEntriesByFilter() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(4); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -452,46 +334,26 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return e.getKey() > 2; + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return evt.getKey() > 2; } }); - // Second query to wait for notifications about all updates. - CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery(); - - final CountDownLatch latch0 = new CountDownLatch(8); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + cache.put(4, 4); - qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> ignored : entries) - latch0.countDown(); + cache.remove(2); + cache.remove(3); - return true; - } - }); - - try { - qry.execute(); - qry0.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); - cache.putx(4, 4); - - cache.removex(2); - cache.removex(3); - - cache.putx(1, 10); - cache.putx(4, 40); + cache.put(1, 10); + cache.put(4, 40); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -510,91 +372,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(2, vals.size()); assertEquals(4, (int)vals.get(0)); assertEquals(40, (int)vals.get(1)); - - assert latch0.await(2, SECONDS); - } - finally { - qry.close(); - qry0.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testProjection() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - // Queries for non-partitioned caches always run locally. - if (cache.configuration().getCacheMode() != PARTITIONED) - return; - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, List<Integer>> map = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(1); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - synchronized (map) { - List<Integer> vals = map.get(e.getKey()); - - if (vals == null) { - vals = new ArrayList<>(); - - map.put(e.getKey(), vals); - } - - vals.add(e.getValue()); - } - - latch.countDown(); - } - - return true; - } - }); - - try { - qry.execute(grid(0).forRemotes()); - - int locKey = -1; - int rmtKey = -1; - - int key = 0; - - while (true) { - ClusterNode n = grid(0).mapKeyToNode(null, key); - - assert n != null; - - if (n.equals(grid(0).localNode())) - locKey = key; - else - rmtKey = key; - - key++; - - if (locKey >= 0 && rmtKey >= 0) - break; - } - - cache.putx(locKey, 1); - cache.putx(rmtKey, 2); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(1, map.size()); - - List<Integer> vals = map.get(rmtKey); - - assertNotNull(vals); - assertEquals(1, vals.size()); - assertEquals(2, (int)vals.get(0)); - } - finally { - qry.close(); } } @@ -602,20 +379,19 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo * @throws Exception If failed. */ public void testLocalNodeOnly() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - // Queries for non-partitioned caches always run locally. - if (cache.configuration().getCacheMode() != PARTITIONED) + if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED) return; - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(1); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -630,14 +406,10 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - try { - qry.execute(grid(0).forLocal()); - + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.localQuery(qry)) { int locKey = -1; int rmtKey = -1; @@ -659,8 +431,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo break; } - cache.putx(locKey, 1); - cache.putx(rmtKey, 2); + cache.put(locKey, 1); + cache.put(rmtKey, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -672,103 +444,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, vals.size()); assertEquals(1, (int)vals.get(0)); } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopByCallback() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, List<Integer>> map = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(1); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - synchronized (map) { - List<Integer> vals = map.get(e.getKey()); - - if (vals == null) { - vals = new ArrayList<>(); - - map.put(e.getKey(), vals); - } - - vals.add(e.getValue()); - } - - latch.countDown(); - } - - return false; - } - }); - - // Second query to wait for notifications about all updates. - CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery(); - - final CountDownLatch latch0 = new CountDownLatch(3); - - qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> ignored : entries) - latch0.countDown(); - - return true; - } - }); - - try { - qry.execute(); - qry0.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(1, map.size()); - - List<Integer> list = F.first(map.values()); - - assert list != null; - - assertEquals(1, list.size()); - - assert latch0.await(2, SECONDS); - } - finally { - qry.close(); - qry0.close(); - } } /** * @throws Exception If failed. */ public void testBuffering() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - // Buffering make sense only for remote nodes, so test only for partitioned cache. - if (cache.configuration().getCacheMode() != PARTITIONED) + if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED) return; - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -783,18 +477,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - qry.bufferSize(5); + qry.setBufferSize(5); - try { + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).forRemotes().nodes()); - qry.execute(grid(0).forNode(node)); - Collection<Integer> keys = new HashSet<>(); int key = 0; @@ -816,12 +506,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo Iterator<Integer> it = keys.iterator(); for (int i = 0; i < 4; i++) - cache.putx(it.next(), 0); + cache.put(it.next(), 0); assert !latch.await(2, SECONDS); for (int i = 0; i < 2; i++) - cache.putx(it.next(), 0); + cache.put(it.next(), 0); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -839,29 +529,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(0, (int)vals.get(0)); } } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testTimeInterval() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - // Buffering make sense only for remote nodes, so test only for partitioned cache. - if (cache.configuration().getCacheMode() != PARTITIONED) + if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != PARTITIONED) return; - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -876,15 +562,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - qry.bufferSize(10); - qry.timeInterval(3000); + qry.setBufferSize(10); + qry.setTimeInterval(3000); - try { + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).forRemotes().nodes()); Collection<Integer> keys = new HashSet<>(); @@ -906,9 +590,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } for (Integer k : keys) - cache.putx(k, 0); - - qry.execute(grid(0).forNode(node)); + cache.put(k, 0); assert !latch.await(2, SECONDS); assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS); @@ -927,296 +609,145 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(0, (int)vals.get(0)); } } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ - public void testIteration() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + public void testInitialPredicate() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } + qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() { + @Override public boolean apply(Integer k, Integer v) { + return k >= 5; + } + })); - return true; + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + assert false; } }); - try { - for (int i = 0; i < 10; i++) - cache.putx(i, i); - - qry.execute(); + for (int i = 0; i < 10; i++) + cache.put(i, i); - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(10, map.size()); + try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) { + List<Cache.Entry<Integer, Integer>> res = cur.getAll(); - for (int i = 0; i < 10; i++) - assertEquals(i, (int)map.get(i)); - } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testIterationAndUpdates() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(12); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); + Collections.sort(res, new Comparator<Cache.Entry<Integer, Integer>>() { + @Override public int compare(Cache.Entry<Integer, Integer> e1, Cache.Entry<Integer, Integer> e2) { + return e1.getKey().compareTo(e2.getKey()); } + }); - return true; - } - }); - - try { - for (int i = 0; i < 10; i++) - cache.putx(i, i); - - qry.execute(); - - cache.putx(10, 10); - cache.putx(11, 11); + assertEquals(5, res.size()); - assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount(); + int exp = 5; - assertEquals(12, map.size()); + for (Cache.Entry<Integer, Integer> e : res) { + assertEquals(exp, e.getKey().intValue()); + assertEquals(exp, e.getValue().intValue()); - for (int i = 0; i < 12; i++) - assertEquals(i, (int)map.get(i)); - } - finally { - qry.close(); + exp++; + } } } /** * @throws Exception If failed. */ - public void testLoadCache() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + public void testInitialPredicateAndUpdates() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - return true; + qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() { + @Override public boolean apply(Integer k, Integer v) { + return k >= 5; } - }); - - try { - qry.execute(); - - for (int i = 0; i < gridCount(); i++) - grid(i).cache(null).loadCache(null, 0); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount(); - - assertEquals(10, map.size()); - - for (int i = 0; i < 10; i++) - assertEquals(i, (int)map.get(i)); - } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTypedProjection() throws Exception { - GridCache<Object, Object> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = - cache.projection(Integer.class, Integer.class).queries().createContinuousQuery(); + })); final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(2); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { map.put(e.getKey(), e.getValue()); latch.countDown(); } - - return true; - } - }); - - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; } }); - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx("a", "a"); - cache.putx(2, 2); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(2, map.size()); - - assertEquals(1, (int)map.get(1)); - assertEquals(2, (int)map.get(2)); - } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testEntryFilterProjection() throws Exception { - CacheProjection<Integer, Integer> cache = grid(0).cache(null); + for (int i = 0; i < 10; i++) + cache.put(i, i); - CacheContinuousQuery<Integer, Integer> qry = cache.projection( - new P1<Cache.Entry<Integer, Integer>>() { - @Override public boolean apply(Cache.Entry<Integer, Integer> e) { - Integer i = e.getValue(); + try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) { + List<Cache.Entry<Integer, Integer>> res = cur.getAll(); - return i != null && i > 10; + Collections.sort(res, new Comparator<Cache.Entry<Integer, Integer>>() { + @Override public int compare(Cache.Entry<Integer, Integer> e1, Cache.Entry<Integer, Integer> e2) { + return e1.getKey().compareTo(e2.getKey()); } - }).queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); + }); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (CacheContinuousQueryEntry<Integer, Integer> e : entries) { - info("Query entry: " + e); + assertEquals(5, res.size()); - map.put(e.getKey(), e.getValue()); + int exp = 5; - latch.countDown(); - } + for (Cache.Entry<Integer, Integer> e : res) { + assertEquals(exp, e.getKey().intValue()); + assertEquals(exp, e.getValue().intValue()); - return true; + exp++; } - }); - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; - } - }); - - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx(11, 11); - cache.putx(2, 2); - cache.putx(22, 22); + cache.put(10, 10); + cache.put(11, 11); - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); + assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount(); - assertEquals("Invalid number of entries notified: " + map, 2, map.size()); + assertEquals(2, map.size()); - assertEquals(11, (int)map.get(11)); - assertEquals(22, (int)map.get(22)); - } - finally { - qry.close(); + for (int i = 11; i < 12; i++) + assertEquals(i, (int)map.get(i)); } } /** * @throws Exception If failed. */ - public void testKeyValueFilterProjection() throws Exception { - CacheProjection<Integer, Integer> cache = grid(0).cache(null); + public void testLoadCache() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.projection( - new P2<Integer, Integer>() { - @Override public boolean apply(Integer key, Integer val) { - return val > 10; - } - }).queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); + final CountDownLatch latch = new CountDownLatch(10); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { map.put(e.getKey(), e.getValue()); latch.countDown(); } - - return true; } }); - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; - } - }); - - try { - qry.execute(); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.loadCache(null, 0); - cache.putx(1, 1); - cache.putx(11, 11); - cache.putx(2, 2); - cache.putx(22, 22); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); + assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount(); - assertEquals(2, map.size()); + assertEquals(10, map.size()); - assertEquals(11, (int)map.get(11)); - assertEquals(22, (int)map.get(22)); - } - finally { - qry.close(); + for (int i = 0; i < 10; i++) + assertEquals(i, (int)map.get(i)); } } @@ -1227,33 +758,28 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo if (atomicityMode() == ATOMIC) return; - GridCache<Object, Object> cache = grid(0).cache(null); + IgniteCache<Object, Object> cache = grid(0).jcache(null); - CacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Object, Object> qry = Query.continuous(); final Map<Object, Object> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(2); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - for (Map.Entry<Object, Object> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> e : evts) { map.put(e.getKey(), e.getValue()); latch.countDown(); } - - return true; } }); - try { - qry.execute(); - - cache.putx(new GridCacheInternalKeyImpl("test"), 1); + try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) { + cache.put(new GridCacheInternalKeyImpl("test"), 1); - cache.putx(1, 1); - cache.putx(2, 2); + cache.put(1, 1); + cache.put(2, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -1262,42 +788,41 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, (int)map.get(1)); assertEquals(2, (int)map.get(2)); } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testNodeJoin() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - final Collection<Map.Entry<Integer, Integer>> all = new ConcurrentLinkedDeque8<>(); + final Collection<CacheEntryEvent<? extends Integer, ? extends Integer>> all = new ConcurrentLinkedDeque8<>(); final CountDownLatch latch = new CountDownLatch(2); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - assertEquals(1, entries.size()); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + int size = 0; - all.addAll(entries); + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + all.add(evt); - latch.countDown(); + size++; + } - return true; + assertEquals(1, size); + + latch.countDown(); } }); - qry.execute(); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); - cache.putx(1, 1); - - try { startGrid("anotherGrid"); - cache.putx(2, 2); + cache.put(2, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all; @@ -1305,63 +830,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } finally { stopGrid("anotherGrid"); - - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testCallbackForPreload() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - if (cache.configuration().getCacheMode() == LOCAL) - return; - - Map<Integer, Integer> map = new HashMap<>(); - - final int keysCnt = 1000; - - for (int i = 0; i < keysCnt; i++) - map.put(i, i); - - cache.putAll(map); - - Ignite ignite = startGrid("anotherGrid"); - - try { - cache = ignite.cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final CountDownLatch latch = new CountDownLatch(1); - final Collection<Integer> keys = new GridConcurrentHashSet<>(); - - qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - keys.add(e.getKey()); - - if (keys.size() >= keysCnt) - latch.countDown(); - } - - return true; - } - }); - - qry.execute(); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(keysCnt, keys.size()); - - qry.close(); - } - finally { - stopGrid("anotherGrid"); } } @@ -1426,26 +894,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo grid(i).events().localListen(execLsnr, EVT_CACHE_QUERY_EXECUTED); } - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - try (CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery()) { - qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - return true; - } - }); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return e.getValue() >= 50; - } - }); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + // No-op. + } + }); - qry.execute(); + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return evt.getValue() >= 50; + } + }); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { for (int i = 0; i < 100; i++) - cache.putx(i, i); + cache.put(i, i); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -1466,8 +933,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo */ private static class TestStore extends CacheStoreAdapter<Object, Object> { /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, - Object... args) { + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { for (int i = 0; i < 10; i++) clo.apply(i, i); }