Repository: incubator-ignite Updated Branches: refs/heads/ignite-426 88a92639d -> 37c3fe77e
# ignite-426 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37c3fe77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37c3fe77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37c3fe77 Branch: refs/heads/ignite-426 Commit: 37c3fe77ede827cb758c683f15112fa5ec00a846 Parents: 88a9263 Author: sboikov <sboi...@gridgain.com> Authored: Mon Aug 17 16:17:03 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Aug 17 17:00:40 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 +- ...acheContinuousQueryFailoverAbstractTest.java | 144 +++++++++++++++++-- ...inuousQueryFailoverAtomicReplicatedTest.java | 6 + 3 files changed, 142 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 82e06db..14173d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -129,7 +129,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private final boolean rawRetval; /** Fast map flag. */ - private boolean fastMap; + private final boolean fastMap; /** */ private boolean fastMapRemap; @@ -729,9 +729,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - if (fastMap && futVer == null) - fastMap = cctx.topology().rebalanceFinished(topVer); - if (futVer == null) // Assign future version in topology read lock before first exception may be thrown. futVer = cctx.versions().next(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 151ae33..ce21968 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -260,7 +260,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Affinity<Object> aff = qryClient.affinity(null); - CacheEventListener1 lsnr = new CacheEventListener1(); + CacheEventListener1 lsnr = new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -455,7 +455,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Ignite qryClient = startGrid(2); - CacheEventListener1 lsnr = new CacheEventListener1(); + CacheEventListener1 lsnr = new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -487,7 +487,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -525,7 +525,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo public void testBackupQueueCleanupServerQuery() throws Exception { Ignite qryClient = startGridsMultiThreaded(2); - CacheEventListener1 lsnr = new CacheEventListener1(); + CacheEventListener1 lsnr = new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -557,7 +557,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } }, 3000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -746,6 +746,105 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } /** + * @throws Exception If failed. + */ + public void testMultiThreaded() throws Exception { + final int SRV_NODES = 3; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + final IgniteCache<Object, Object> cache = qryClient.cache(null); + + CacheEventListener1 lsnr = new CacheEventListener1(true); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = cache.query(qry); + + client = false; + + final int SRV_IDX = SRV_NODES - 1; + + List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10); + + final int THREADS = 10; + + for (int i = 0; i < keys.size(); i++) { + log.info("Iteration: " + i); + + Ignite srv = ignite(SRV_IDX); + + TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + spi.sndFirstOnly = new AtomicBoolean(false); + + final Integer key = keys.get(i); + + final AtomicInteger val = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(THREADS); + + lsnr.latch = latch; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Integer val0 = val.getAndIncrement(); + + cache.put(key, val0); + + return null; + } + }, THREADS, "update-thread"); + + fut.get(); + + stopGrid(SRV_IDX); + + if (!latch.await(5, SECONDS)) + fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']'); + + assertEquals(THREADS, lsnr.allEvts.size()); + + Set<Integer> vals = new HashSet<>(); + + boolean err = false; + + for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) { + assertEquals(key, evt.getKey()); + assertNotNull(evt.getValue()); + + if (!vals.add((Integer)evt.getValue())) { + err = true; + + log.info("Extra event: " + evt); + } + } + + for (int v = 0; v < THREADS; v++) { + if (!vals.contains(v)) { + err = true; + + log.info("Event for value not received: " + v); + } + } + + assertFalse("Invalid events, see log for details.", err); + + lsnr.allEvts.clear(); + + startGrid(SRV_IDX); + } + + cur.close(); + } + + /** * @param logAll If {@code true} logs all unexpected values. * @param expEvts Expected values. * @param lsnr Listener. @@ -822,21 +921,36 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); /** */ + private List<CacheEntryEvent<?, ?>> allEvts; + + /** */ @LoggerResource private IgniteLogger log; + /** + * @param saveAll Save all events flag. + */ + CacheEventListener1(boolean saveAll) { + if (saveAll) + allEvts = new ArrayList<>(); + } + /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { try { for (CacheEntryEvent<?, ?> evt : evts) { CountDownLatch latch = this.latch; - log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null)); + log.info("Received cache event [evt=" + evt + + ", left=" + (latch != null ? latch.getCount() : null) + ']'); this.evts.put(evt.getKey(), evt); keys.add((Integer) evt.getKey()); + if (allEvts != null) + allEvts.add(evt); + assertTrue(latch != null); assertTrue(latch.getCount() > 0); @@ -933,17 +1047,29 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** */ private volatile boolean skipMsg; + /** */ + private volatile AtomicBoolean sndFirstOnly; + /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { - if (skipMsg && msg instanceof GridIoMessage) { - Object msg0 = ((GridIoMessage)msg).message(); + Object msg0 = ((GridIoMessage)msg).message(); - if (msg0 instanceof GridContinuousMessage) { + if (msg0 instanceof GridContinuousMessage) { + if (skipMsg) { log.info("Skip continuous message: " + msg0); return; } + else { + AtomicBoolean sndFirstOnly = this.sndFirstOnly; + + if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) { + log.info("Skip continuous message: " + msg0); + + return; + } + } } super.sendMessage(node, msg, ackC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c3fe77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java index c8209d9..61f0560 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.cache.*; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheMode.*; /** @@ -29,4 +30,9 @@ public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheConti @Override protected CacheMode cacheMode() { return REPLICATED; } + + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode writeOrderMode() { + return PRIMARY; + } }