Repository: incubator-ignite Updated Branches: refs/heads/ignite-sql-old 3b8f9a64d -> b42fdcd24
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java index f9c7ee2..5d0d84d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java @@ -17,11 +17,12 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; +import javax.cache.*; +import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -46,40 +47,38 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu /** * @throws Exception If failed. */ - @SuppressWarnings("unchecked") public void testRemoteNodeCallback() throws Exception { - GridCache<Integer, Integer> cache1 = grid(0).cache(null); + IgniteCache<Integer, Integer> cache1 = grid(0).jcache(null); + IgniteCache<Integer, Integer> cache2 = grid(1).jcache(null); - GridCache<Integer, Integer> cache2 = grid(1).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache2.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final AtomicReference<Integer> val = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - assertEquals(1, entries.size()); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> it = evts.iterator(); + + CacheEntryEvent<? extends Integer, ? extends Integer> e = it.next(); - Map.Entry<Integer, Integer> e = entries.iterator().next(); + assert !it.hasNext(); - log.info("Entry: " + e); + log.info("Event: " + e); val.set(e.getValue()); latch.countDown(); - - return false; } }); - qry.execute(); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache2.query(qry)) { + cache1.put(1, 10); - cache1.put(1, 10); + latch.await(LATCH_TIMEOUT, MILLISECONDS); - latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(10, val.get().intValue()); + assertEquals(10, val.get().intValue()); + } } /** @@ -87,61 +86,49 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu * * @throws Exception If failed. */ - @SuppressWarnings("unchecked") public void testCrossCallback() throws Exception { // Prepare. - GridCache<Integer, Integer> cache1 = grid(0).cache(null); - GridCache<Integer, Integer> cache2 = grid(1).cache(null); + IgniteCache<Integer, Integer> cache1 = grid(0).jcache(null); + IgniteCache<Integer, Integer> cache2 = grid(1).jcache(null); - final int key1 = primaryKey(jcache(0)); - final int key2 = primaryKey(jcache(1)); + final int key1 = primaryKey(cache1); + final int key2 = primaryKey(cache2); final CountDownLatch latch1 = new CountDownLatch(2); final CountDownLatch latch2 = new CountDownLatch(2); + ContinuousQuery<Integer, Integer> qry1 = Query.continuous(); - // Start query on the first node. - CacheContinuousQuery<Integer, Integer> qry1 = cache1.queries().createContinuousQuery(); - - qry1.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeID, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (CacheContinuousQueryEntry entry : entries) { - log.info("Update in cache 1: " + entry); + qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + log.info("Update in cache 1: " + evt); - if (entry.getKey() == key1 || entry.getKey() == key2) - latch1.countDown(); + if (evt.getKey() == key1 || evt.getKey() == key2) latch1.countDown(); } - - return latch1.getCount() != 0; } }); - qry1.execute(); - - // Start query on the second node. - CacheContinuousQuery<Integer, Integer> qry2 = cache2.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry2 = Query.continuous(); - qry2.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeID, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (CacheContinuousQueryEntry entry : entries) { - log.info("Update in cache 2: " + entry); + qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + log.info("Update in cache 2: " + evt); - if (entry.getKey() == key1 || entry.getKey() == key2) + if (evt.getKey() == key1 || evt.getKey() == key2) latch2.countDown(); } - - return latch2.getCount() != 0; } }); - qry2.execute(); - - cache1.put(key1, key1); - cache1.put(key2, key2); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache2.query(qry1); + QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache2.query(qry2)) { + cache1.put(key1, key1); + cache1.put(key2, key2); - assert latch1.await(LATCH_TIMEOUT, MILLISECONDS); - assert latch2.await(LATCH_TIMEOUT, MILLISECONDS); + assert latch1.await(LATCH_TIMEOUT, MILLISECONDS); + assert latch2.await(LATCH_TIMEOUT, MILLISECONDS); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java index d06d53f..6fe1cc5 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java @@ -18,18 +18,18 @@ package org.apache.ignite.loadtests.continuous; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -79,17 +79,17 @@ public class GridContinuousOperationsLoadTest { dumpProperties(System.out); try (Ignite ignite = Ignition.start(cfgPath)) { - final GridCache<Object, Object> cache = ignite.cache(cacheName); + final IgniteCache<Object, Object> cache = ignite.jcache(cacheName); if (cache == null) throw new IgniteCheckedException("Cache is not configured: " + cacheName); // Continuous query manager, used to monitor queue size. - final GridCacheContinuousQueryManager contQryMgr = + final CacheContinuousQueryManager contQryMgr = ((GridCacheAdapter)((GridCacheProxyImpl)cache).cache()).context().continuousQueries(); if (contQryMgr == null) - throw new IgniteCheckedException("Could not access GridCacheContinuousQueryManager"); + throw new IgniteCheckedException("Could not access CacheContinuousQueryManager"); final AtomicBoolean stop = new AtomicBoolean(); // Stop flag. final AtomicLong cbCntr = new AtomicLong(); // Callback counter. @@ -97,33 +97,43 @@ public class GridContinuousOperationsLoadTest { for (int i = 0; i < parallelCnt; i++) { if (useQry) { - CacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Object, Object> qry = Query.continuous(); - qry.localCallback(new PX2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean applyx(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) - throws IgniteInterruptedCheckedException { - if (cbSleepMs > 0) - U.sleep(cbSleepMs); - - cbCntr.addAndGet(entries.size()); + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + if (cbSleepMs > 0) { + try { + U.sleep(cbSleepMs); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + } - return true; // Continue listening. + for (CacheEntryEvent<?, ?> ignored : evts) + cbCntr.incrementAndGet(); } }); - qry.remoteFilter(new IgnitePredicateX<CacheContinuousQueryEntry<Object, Object>>() { - @Override public boolean applyx(CacheContinuousQueryEntry e) throws IgniteInterruptedCheckedException { - if (filterSleepMs > 0) - U.sleep(filterSleepMs); + qry.setRemoteFilter(new CacheEntryEventFilter<Object, Object>() { + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) { + if (filterSleepMs > 0) { + try { + U.sleep(filterSleepMs); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + } return Math.random() * 100 >= filterSkipProb; } }); - qry.bufferSize(bufSize); - qry.timeInterval(timeInterval); + qry.setBufferSize(bufSize); + qry.setTimeInterval(timeInterval); - qry.execute(); + cache.query(qry); } else { ignite.events().remoteListen( @@ -188,7 +198,7 @@ public class GridContinuousOperationsLoadTest { while (!stop.get() && !Thread.currentThread().isInterrupted()) { Integer key = rnd.nextInt(keyRange); - cache.putx(key, val); + cache.put(key, val); updCntr.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 884ceca..8ba8966 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -64,7 +64,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new CacheConfiguration()), new GridCacheEvictionManager<K, V>(), new GridCacheLocalQueryManager<K, V>(), - new GridCacheContinuousQueryManager<K, V>(), + new CacheContinuousQueryManager<K, V>(), new GridCacheAffinityManager<K, V>(), new CacheDataStructuresManager<K, V>(), new GridCacheTtlManager<K, V>(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index 3af9835..6ec4058 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.hadoop.jobtracker; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.events.*; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; @@ -36,6 +36,7 @@ import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; @@ -135,8 +136,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); - finishedJobMetaPrj = ((GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)prj). - withExpiryPolicy(finishedJobPlc); + finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); } else finishedJobMetaPrj = jobMetaPrj; @@ -169,14 +169,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent { @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); - CacheContinuousQuery<GridHadoopJobId, GridHadoopJobMetadata> qry = jobMetaCache().queries().createContinuousQuery(); - - qry.localCallback( - new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<GridHadoopJobId, GridHadoopJobMetadata>>>() { - @Override public boolean apply(UUID nodeId, - final Collection<CacheContinuousQueryEntry<GridHadoopJobId, GridHadoopJobMetadata>> evts) { + jobMetaCache().context().continuousQueries().executeInternalQuery( + new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() { + @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, + ? extends GridHadoopJobMetadata>> evts) { if (!busyLock.tryReadLock()) - return false; + return; try { // Must process query callback in a separate thread to avoid deadlocks. @@ -185,16 +183,15 @@ public class GridHadoopJobTracker extends GridHadoopComponent { processJobMetadataUpdates(evts); } }); - - return true; } finally { busyLock.readUnlock(); } } - }); - - qry.execute(ctx.kernalContext().grid().forLocal()); + }, + null, + true + ); ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(final Event evt) { @@ -629,11 +626,11 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @throws IgniteCheckedException If failed. */ private void processJobMetadataUpdates( - Iterable<CacheContinuousQueryEntry<GridHadoopJobId, GridHadoopJobMetadata>> updated) + Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated) throws IgniteCheckedException { UUID locNodeId = ctx.localNodeId(); - for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) { + for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) { GridHadoopJobId jobId = entry.getKey(); GridHadoopJobMetadata meta = entry.getValue();