Repository: incubator-ignite Updated Branches: refs/heads/ignite-426 1e8d8aed4 -> eee8ea416
# ignite-426 add tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eee8ea41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eee8ea41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eee8ea41 Branch: refs/heads/ignite-426 Commit: eee8ea4169047a2788aa7674aa9ff65a063b3897 Parents: 1e8d8ae Author: sboikov <sboi...@gridgain.com> Authored: Thu Aug 13 11:57:36 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Aug 13 15:46:08 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 6 +- .../continuous/CacheContinuousQueryHandler.java | 147 +++--- .../continuous/CacheContinuousQueryManager.java | 36 +- ...acheContinuousQueryFailoverAbstractTest.java | 457 +++++++++++++++++-- ...ueryFailoverAtomicPrimaryWriteOrderTest.java | 32 ++ ...inuousQueryFailoverAtomicReplicatedTest.java | 32 ++ ...ContinuousQueryFailoverTxReplicatedTest.java | 32 ++ .../IgniteCacheQuerySelfTestSuite.java | 3 + 8 files changed, 617 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index b7d4375..a0a75c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -592,11 +592,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return Next update index. */ public long nextContinuousQueryUpdateIndex() { - long res = contQryUpdIdx.incrementAndGet(); - - log.info("Next update index [node=" + cctx.gridName() + ", part=" + id + ", idx=" + res + ']'); - - return res; + return contQryUpdIdx.incrementAndGet(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 4bd2e1c..98e857b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -39,6 +40,7 @@ import javax.cache.event.*; import javax.cache.event.EventType; import java.io.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.events.EventType.*; @@ -50,7 +52,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private static final long serialVersionUID = 0L; /** */ - private static final int BACKUP_ACK_THRESHOLD = 1; + private static final int BACKUP_ACK_THRESHOLD = 100; /** Cache name. */ private String cacheName; @@ -95,10 +97,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient Map<Integer, Long> rcvCntrs; /** */ - private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter(); + private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter; /** */ - private transient AcknowledgeData ackData = new AcknowledgeData(); + private transient AcknowledgeBuffer ackBuf; /** * Required by {@link Externalizable}. @@ -121,6 +123,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param ignoreExpired Ignore expired events flag. * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. + * @param locCache {@code True} if local cache. */ public CacheContinuousQueryHandler( String cacheName, @@ -133,7 +136,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { boolean sync, boolean ignoreExpired, int taskHash, - boolean skipPrimaryCheck) { + boolean skipPrimaryCheck, + boolean locCache) { assert topic != null; assert locLsnr != null; @@ -149,7 +153,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; - this.rcvCntrs = new HashMap<>(); + if (locCache) + dupEvtFilter = F.alwaysTrue(); + else { + rcvCntrs = new ConcurrentHashMap<>(); + + dupEvtFilter = new DuplicateEventFilter(); + } } /** {@inheritDoc} */ @@ -187,8 +197,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { backupQueue = new ConcurrentLinkedDeque8<>(); + ackBuf = new AcknowledgeBuffer(); + final boolean loc = nodeId.equals(ctx.localNodeId()); + assert !skipPrimaryCheck || loc; + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -217,8 +231,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { GridCacheContext<K, V> cctx = cacheContext(ctx); - if (cctx.isReplicated() && !skipPrimaryCheck && !primary) - return; + // skipPrimaryCheck is set only when listen locally for replicated cache events. + assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); boolean notify = true; @@ -232,30 +246,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } if (notify) { - if (loc && dupEvtFilter.apply(evt.entry())) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - else { - try { - final CacheContinuousQueryEntry entry = evt.entry(); + try { + final CacheContinuousQueryEntry entry = evt.entry(); + + if (primary || skipPrimaryCheck) { + if (loc) { + if (dupEvtFilter.apply(entry)) { + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - if (primary) { + if (!skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { prepareEntry(cctx, nodeId, entry); ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } - else - backupQueue.add(entry); } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + else + backupQueue.add(entry); + } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); } if (recordIgniteEvt) { @@ -292,28 +312,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { while (it.hasNext()) { CacheContinuousQueryEntry backupEntry = it.next(); - assert backupEntry != null; - Long updateIdx = updateIdxs.get(backupEntry.partition()); - if (updateIdx != null) { - assert backupEntry.updateIndex() <= updateIdx; - + if (updateIdx != null && backupEntry.updateIndex() <= updateIdx) it.remove(); - - if (backupEntry.updateIndex() == updateIdx) { - updateIdxs.remove(backupEntry.partition()); - - if (updateIdxs.isEmpty()) - break; - } - } } } @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { - ctx.log(getClass()).info("Flush backup queue [topVer=" + topVer + ", queue=" + backupQueue + ']'); - if (backupQueue.isEmpty()) return; @@ -333,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) { - sendBackupAcknowledge(ackData.acknowledgeOnTimeout(), routineId, ctx); + sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } @Override public void onPartitionEvicted(int part) { @@ -469,30 +475,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (e.updateIndex() > cntr0) { // TODO IGNITE-426: remove assert. - if (e.updateIndex() != cntr0 + 1) - System.out.println(Thread.currentThread().getName() + - "Invalid entry [part=" + e.partition() + ", cntr=" + cntr + ", e=" + e + ']'); - assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']'; - System.out.println(Thread.currentThread().getName() + - " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']'); - rcvCntrs.put(part, e.updateIndex()); } - else { - System.out.println(Thread.currentThread().getName() + - " ignore entry [cntr=" + cntr0 + ", idx=" + e.updateIndex() + ", e=" + e + ']'); - + else return false; - } } - else { - System.out.println(Thread.currentThread().getName() + - " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']'); - + else rcvCntrs.put(part, e.updateIndex()); - } return true; } @@ -525,7 +516,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @Override public void onBatchAcknowledged(final UUID routineId, GridContinuousBatch batch, final GridKernalContext ctx) { - sendBackupAcknowledge(ackData.onAcknowledged(batch), routineId, ctx); + sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); } /** @@ -651,14 +642,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** */ - private static class AcknowledgeData { + private static class AcknowledgeBuffer { /** */ private int size; /** */ + @GridToStringInclude private Map<Integer, Long> updateIdxs = new HashMap<>(); /** */ + @GridToStringInclude private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); /** @@ -668,24 +661,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @SuppressWarnings("unchecked") @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> onAcknowledged(GridContinuousBatch batch) { - // TODO: IGNITE-426: check if it is called from nio thread in correct order. size += batch.size(); Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); - for (CacheContinuousQueryEntry e : entries) { - topVers.add(e.topologyVersion()); + for (CacheContinuousQueryEntry e : entries) + addEntry(e); - Long cntr0 = updateIdxs.get(e.partition()); + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } - if (cntr0 == null || e.updateIndex() > cntr0) - updateIdxs.put(e.partition(), e.updateIndex()); - } + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; } /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateIdxs.get(e.partition()); + + if (cntr0 == null || e.updateIndex() > cntr0) + updateIdxs.put(e.partition(), e.updateIndex()); + } + + /** * @return Non-null tuple if acknowledge should be sent to backups. */ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> @@ -697,6 +708,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @return Tuple with acknowledge information. */ private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { + assert size > 0; + Map<Integer, Long> idxs = new HashMap<>(updateIdxs); IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = @@ -711,7 +724,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(AcknowledgeData.class, this); + return S.toString(AcknowledgeBuffer.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 9eae419..cc8f77b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -58,6 +58,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** */ private static final byte EXPIRED_FLAG = 0b1000; + /** */ + private static final long BACKUP_ACK_FREQ = 5000; + /** Listeners. */ private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>(); @@ -94,6 +97,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.cleanupBackupQueue(msg.updateIndexes()); } }); + + cctx.time().schedule(new Runnable() { + @Override public void run() { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); + } + }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); } /** {@inheritDoc} */ @@ -166,11 +179,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (!hasNewVal && !hasOldVal) return; - GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false); + long updateIdx; - assert locPart != null; + if (!cctx.isLocal()) { + GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false); - long updateIdx = locPart.nextContinuousQueryUpdateIndex(); + assert locPart != null; + + updateIdx = locPart.nextContinuousQueryUpdateIndex(); + } + else + updateIdx = 0; EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; @@ -206,12 +225,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { updateIdx, topVer); - log.info("Created entry [node=" + cctx.gridName() + - ", primary=" + primary + - ", preload=" + preload + - ", part=" + e.partition() + - ", idx=" + updateIdx + ']'); - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -405,7 +418,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.onPartitionEvicted(part); } - /** * @param locLsnr Local listener. * @param rmtFilter Remote filter. @@ -480,7 +492,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { sync, ignoreExpired, taskNameHash, - skipPrimaryCheck); + skipPrimaryCheck, + cctx.isLocal()); UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe, grp.predicate()).get(); @@ -702,6 +715,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param impl Listener. + * @param log Logger. */ JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) { assert impl != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 1eb3ae7..3afb01c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -23,8 +23,10 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; @@ -33,13 +35,17 @@ import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** @@ -47,7 +53,10 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; */ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest { /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int BACKUP_ACK_THRESHOLD = 100; /** */ private static volatile boolean err; @@ -62,6 +71,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + TestCommunicationSpi commSpi = new TestCommunicationSpi(); commSpi.setIdleConnectionTimeout(100); @@ -72,6 +84,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo ccfg.setCacheMode(cacheMode()); ccfg.setAtomicityMode(atomicityMode()); + ccfg.setAtomicWriteOrderMode(writeOrderMode()); ccfg.setBackups(backups); ccfg.setWriteSynchronizationMode(FULL_SYNC); @@ -83,6 +96,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -107,6 +125,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo protected abstract CacheAtomicityMode atomicityMode(); /** + * @return Write order mode for atomic cache. + */ + protected CacheAtomicWriteOrderMode writeOrderMode() { + return CLOCK; + } + + /** * @throws Exception If failed. */ public void testOneBackup() throws Exception { @@ -117,6 +142,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testThreeBackups() throws Exception { + if (cacheMode() == REPLICATED) + return; + checkBackupQueue(3); } @@ -129,7 +157,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final int SRV_NODES = 4; - startGrids(SRV_NODES); + startGridsMultiThreaded(SRV_NODES); client = true; @@ -139,16 +167,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); - assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups()); + if (cacheMode() != REPLICATED) + assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups()); Affinity<Object> aff = qryClient.affinity(null); - CacheEventListener lsnr = new CacheEventListener(); + CacheEventListener1 lsnr = new CacheEventListener1(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - qry.setAutoUnsubscribe(true); - qry.setLocalListener(lsnr); QueryCursor<?> cur = qryClientCache.query(qry); @@ -188,8 +215,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); - - awaitPartitionMapExchange(); } for (int i = 0; i < SRV_NODES - 1; i++) { @@ -197,8 +222,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Ignite ignite = startGrid(i); - awaitPartitionMapExchange(); - IgniteCache<Object, Object> cache = ignite.cache(null); List<Integer> keys = testKeys(cache, PARTS); @@ -219,7 +242,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo cur.close(); - assertFalse(err); + assertFalse("Unexpected error during test, see log for details.", err); } /** @@ -265,83 +288,427 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ - public void testBackupQueueCleanup() throws Exception { - startGrids(2); + public void testBackupQueueCleanupClientQuery() throws Exception { + startGridsMultiThreaded(2); - Ignite ignite0 = ignite(0); - Ignite ignite1 = ignite(1); + client = true; - CacheEventListener lsnr = new CacheEventListener(); + Ignite qryClient = startGrid(2); - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + CacheEventListener1 lsnr = new CacheEventListener1(); - qry.setAutoUnsubscribe(true); + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); qry.setLocalListener(lsnr); - QueryCursor<?> cur = ignite1.cache(null).query(qry); + QueryCursor<?> cur = qryClient.cache(null).query(qry); + + final Collection<Object> backupQueue = backupQueue(ignite(1)); - IgniteCache<Object, Object> cache0 = ignite0.cache(null); + assertEquals(0, backupQueue.size()); - final int KEYS = 1; + IgniteCache<Object, Object> cache0 = ignite(0).cache(null); - List<Integer> keys = primaryKeys(cache0, KEYS); + List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD); CountDownLatch latch = new CountDownLatch(keys.size()); lsnr.latch = latch; + for (Integer key : keys) { + log.info("Put: " + key); + + cache0.put(key, key); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + + if (!latch.await(5, SECONDS)) + fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); + + keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2); + + latch = new CountDownLatch(keys.size()); + + lsnr.latch = latch; + for (Integer key : keys) cache0.put(key, key); + final long ACK_FREQ = 5000; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, ACK_FREQ + 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); - Thread.sleep(10_000); + cur.close(); + + assertFalse("Unexpected error during test, see log for details.", err); + } + + /** + * @throws Exception If failed. + */ + public void testBackupQueueCleanupServerQuery() throws Exception { + Ignite qryClient = startGridsMultiThreaded(2); + + CacheEventListener1 lsnr = new CacheEventListener1(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Object, Object> cache = qryClient.cache(null); + + QueryCursor<?> cur = cache.query(qry); + + final Collection<Object> backupQueue = backupQueue(ignite(1)); + + assertEquals(0, backupQueue.size()); + + List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD); + + CountDownLatch latch = new CountDownLatch(keys.size()); + + lsnr.latch = latch; + + for (Integer key : keys) { + log.info("Put: " + key); + + cache.put(key, key); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + + if (!latch.await(5, SECONDS)) + fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); + + cur.close(); + } + + /** + * @param ignite Ignite. + * @return Backup queue for test query. + */ + private Collection<Object> backupQueue(Ignite ignite) { + GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous(); + + ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos"); + + Collection<Object> backupQueue = null; + + for (Object info : infos.values()) { + GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); + + if (hnd.isForQuery() && hnd.cacheName() == null) { + backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue"); + + break; + } + } + + assertNotNull(backupQueue); + + return backupQueue; + } + + /** + * @throws Exception If failed. + */ + public void _testFailover() throws Exception { + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); + + final CacheEventListener2 lsnr = new CacheEventListener2(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = qryClientCache.query(qry); + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>(); + + IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + final int idx = SRV_NODES + 1; + + while (!stop.get() && !err) { + log.info("Start node: " + idx); + + startGrid(idx); + + Thread.sleep(2000); + + log.info("Stop node: " + idx); + + stopGrid(idx); + + Thread.sleep(1000); + + CountDownLatch latch = new CountDownLatch(1); + + assertTrue(checkLatch.compareAndSet(null, latch)); + + if (!stop.get()) { + log.info("Wait for event check."); + + assertTrue(latch.await(1, MINUTES)); + } + } + + return null; + } + }); + + final Map<Integer, Integer> vals = new HashMap<>(); + + try { + long stopTime = System.currentTimeMillis() + 3 * 60_000; + + final int PARTS = qryClient.affinity(null).partitions(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (System.currentTimeMillis() < stopTime) { + Integer key = rnd.nextInt(PARTS); + + Integer val = vals.get(key); + + if (val == null) + val = 0; + else + val = val + 1; + + qryClientCache.put(key, val); + + vals.put(key, val); + + CountDownLatch latch = checkLatch.get(); + + if (latch != null) { + log.info("Check events."); + + checkLatch.set(null); + + boolean success = false; + + try { + if (err) + break; + + boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, vals, lsnr); + } + }, 10_000); + + if (!check) + assertTrue(checkEvents(true, vals, lsnr)); + + success = true; + + log.info("Events checked."); + } + finally { + if (!success) + err = true; + + latch.countDown(); + } + } + } + } + finally { + stop.set(true); + } + + CountDownLatch latch = checkLatch.get(); + + if (latch != null) + latch.countDown(); + + restartFut.get(); + + boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, vals, lsnr); + } + }, 10_000); + + if (!check) + assertTrue(checkEvents(true, vals, lsnr)); cur.close(); + + assertFalse("Unexpected error during test, see log for details.", err); + } + + /** + * @param logAll If {@code true} logs all unexpected values. + * @param vals Expected values. + * @param lsnr Listener. + * @return Check status. + */ + private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) { + assertTrue(!vals.isEmpty()); + + ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals; + + ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs; + + boolean pass = true; + + for (Map.Entry<Integer, Integer> e : vals.entrySet()) { + Integer key = e.getKey(); + + Integer lsnrVal = lsnrVals.get(key); + Integer expVal = e.getValue(); + + if (!expVal.equals(lsnrVal)) { + pass = false; + + log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']'); + + if (!logAll) + return false; + } + + Integer lsnrCntr = lsnrCntrs.get(key); + Integer expCntr = expVal + 1; + + if (!expCntr.equals(lsnrCntr)) { + pass = false; + + log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']'); + + if (!logAll) + return false; + } + } + + return pass; } /** * */ - private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> { /** */ private volatile CountDownLatch latch; /** */ - @IgniteInstanceResource - private Ignite ignite; + @LoggerResource + private IgniteLogger log; /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) { - ignite.log().info("Received cache event: " + evt); + try { + for (CacheEntryEvent<?, ?> evt : evts) { + CountDownLatch latch = this.latch; - CountDownLatch latch = this.latch; + log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null)); - testAssert(evt, latch != null); - testAssert(evt, latch.getCount() > 0); + assertTrue(latch != null); + assertTrue(latch.getCount() > 0); - latch.countDown(); + latch.countDown(); - if (latch.getCount() == 0) - this.latch = null; + if (latch.getCount() == 0) + this.latch = null; + } + } + catch (Throwable e) { + err = true; + + log.error("Unexpected error", e); } } + } + + /** + * + */ + private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> { + /** */ + @LoggerResource + private IgniteLogger log; - /** - * @param evt Event. - * @param cond Condition to check. - */ - private void testAssert(CacheEntryEvent evt, boolean cond) { - if (!cond) { - ignite.log().info("Unexpected event: " + evt); + /** */ + private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>(); - err = true; + /** */ + private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) + throws CacheEntryListenerException { + try { + for (CacheEntryEvent<?, ?> evt : evts) { + Integer key = (Integer)evt.getKey(); + Integer val = (Integer)evt.getValue(); + + assertNotNull(key); + assertNotNull(val); + + Integer prevVal = vals.get(key); + + if (prevVal != null) { + assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val); + assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue()); + } + else { + assertEquals("Unexpected event: " + evt, (Object)0, val); + assertNull("Unexpected event: " + evt, evt.getOldValue()); + } + + vals.put(key, val); + + Integer cntr = cntrs.get(key); + + if (cntr == null) + cntr = 1; + else + cntr = cntr + 1; + + cntrs.put(key, cntr); + } } + catch (Throwable e) { + err = true; - assertTrue(cond); + log.error("Unexpected error", e); + } } } @@ -357,7 +724,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private volatile boolean skipMsg; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { if (skipMsg && msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -369,7 +736,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } } - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java new file mode 100644 index 0000000..6515b21 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; + +/** + * + */ +public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode writeOrderMode() { + return PRIMARY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java new file mode 100644 index 0000000..c8209d9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java new file mode 100644 index 0000000..3ac0c64 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class CacheContinuousQueryFailoverTxReplicatedTest extends CacheContinuousQueryFailoverTxTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 2240226..8a7039e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -99,7 +99,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverAtomicTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);