Repository: incubator-ignite Updated Branches: refs/heads/ignite-426 fa494ff1c -> 31179c17c
# ignite-426 backup queue flush, test Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31179c17 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31179c17 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31179c17 Branch: refs/heads/ignite-426 Commit: 31179c17c01dc80e7f6d2584c84690587d86a61c Parents: fa494ff Author: sboikov <sboi...@gridgain.com> Authored: Tue Aug 11 18:03:28 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Aug 11 18:03:28 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 1 - .../GridDhtPartitionsExchangeFuture.java | 5 + .../continuous/CacheContinuousQueryEvent.java | 6 +- .../continuous/CacheContinuousQueryHandler.java | 100 ++++++- .../CacheContinuousQueryListener.java | 9 + .../continuous/CacheContinuousQueryManager.java | 34 ++- .../continuous/GridContinuousMessage.java | 1 + .../continuous/GridContinuousProcessor.java | 65 ++++- ...acheContinuousQueryFailoverAbstractTest.java | 272 +++++++++++++++++++ .../CacheContinuousQueryFailoverAtomicTest.java | 38 +++ 11 files changed, 506 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 2864fa4..9e7d930 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -96,7 +96,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); /** Continuous query update index. */ - private final AtomicLong contQueryUpdIdx = new AtomicLong(); + private final AtomicLong contQryUpdIdx = new AtomicLong(); /** * @param cctx Context. @@ -590,7 +590,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return Next update index. */ public long nextContinuousQueryUpdateIndex() { - return contQueryUpdIdx.incrementAndGet(); + return contQryUpdIdx.incrementAndGet(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 63edcaa..601f1d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -450,5 +450,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> @Override public String toString() { return S.toString(GridDhtAtomicUpdateFuture.class, this); } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index cbf6b40..99c7fc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -757,6 +757,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -767,6 +769,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (drCacheCtx.isDrEnabled()) drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft()); + if (topChanged) + cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); + // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 4a0d6f7..96fd4ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override - public K getKey() { + @Override public K getKey() { return e.key().value(cctx.cacheObjectContext(), false); } @@ -69,8 +68,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override - public V getOldValue() { + @Override public V getOldValue() { return CU.value(e.oldValue(), cctx, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 88ae39b..8e308a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -85,7 +86,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient boolean skipPrimaryCheck; /** Backup queue. */ - private transient Queue<CacheContinuousQueryEntry> backupQueue; + private transient Collection<CacheContinuousQueryEntry> backupQueue; + + /** */ + private transient Map<Integer, Long> rcvCntrs; + + /** */ + private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter(); /** * Required by {@link Externalizable}. @@ -135,6 +142,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + + this.rcvCntrs = new HashMap<>(); } /** {@inheritDoc} */ @@ -216,23 +225,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } if (notify) { - if (loc) + if (loc && dupEvtFilter.apply(evt.entry())) locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); else { try { final CacheContinuousQueryEntry entry = evt.entry(); if (primary) { - if (ctx.config().isPeerClassLoadingEnabled() && ctx.discovery().node(nodeId) != null) { - entry.prepareMarshal(cctx); - - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); - - depMgr.prepare(entry); - } - else - entry.prepareMarshal(cctx); + prepareEntry(cctx, nodeId, entry); ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } @@ -304,6 +304,25 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } + @Override public void flushBackupQueue(GridKernalContext ctx) { + if (backupQueue.isEmpty()) + return; + + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + for (CacheContinuousQueryEntry e : backupQueue) + prepareEntry(cctx, nodeId, e); + + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + + backupQueue.clear(); + } + catch (IgniteCheckedException e) { + U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e); + } + } + @Override public boolean oldValueRequired() { return oldValRequired; } @@ -325,6 +344,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return mgr.registerListener(routineId, lsnr, internal); } + /** + * @param cctx Context. + * @param nodeId ID of the node that started routine. + * @param entry Entry. + * @throws IgniteCheckedException In case of error. + */ + private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) + throws IgniteCheckedException { + if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) { + entry.prepareMarshal(cctx); + + cctx.deploy().prepare(entry); + } + else + entry.prepareMarshal(cctx); + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -392,12 +428,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { return new CacheContinuousQueryEvent<>(cache, cctx, e); } - } + }, + dupEvtFilter ); locLsnr.onUpdated(evts); } + /** + * @param e Entry. + * @return {@code True} if listener should be notified. + */ + private boolean notifyListener(CacheContinuousQueryEntry e) { + Integer part = e.partition(); + + Long cntr = rcvCntrs.get(part); + + if (cntr != null) { + long cntr0 = cntr; + + if (e.updateIndex() > cntr0) { + // TODO IGNITE-426: remove assert. + assert e.updateIndex() == cntr0 + 1 : "Invalid entry [e=" + e + ", cntr=" + cntr + ']'; + + rcvCntrs.put(part, cntr0); + } + else + return false; + } + else + rcvCntrs.put(part, e.updateIndex()); + + return true; + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; @@ -530,6 +594,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** + * + */ + private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> { + /** {@inheritDoc} */ + @Override public boolean apply(CacheContinuousQueryEntry e) { + return notifyListener(e); + } + } + + /** * Deployable object. */ private static class DeployableObject implements Externalizable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index d5d5ff8..d955aa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import org.apache.ignite.internal.*; + import java.util.*; /** @@ -50,6 +52,13 @@ interface CacheContinuousQueryListener<K, V> { public void cleanupBackupQueue(Map<Integer, Long> updateIdxs); /** + * Flushes backup queue. + * + * @param ctx Context. + */ + public void flushBackupQueue(GridKernalContext ctx); + + /** * @return Whether old value is required. */ public boolean oldValueRequired(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c6a16c9..ce2b111 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -127,7 +127,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. + * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated(GridCacheEntryEx e, @@ -141,8 +143,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { { assert e != null; assert key != null; - - assert Thread.holdsLock(e); + assert Thread.holdsLock(e) : e; boolean internal = e.isInternal() || !e.context().userCache(); @@ -175,7 +176,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean initialized = false; - boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) @@ -204,6 +205,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { e.partition(), updateIdx); + log.info("Created entry [node=" + cctx.gridName() + + ", primary=" + primary + + ", preload=" + preload + + ", part=" + e.partition() + + ", idx=" + updateIdx + ']'); + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -221,8 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { throws IgniteCheckedException { assert e != null; assert key != null; - - assert Thread.holdsLock(e); + assert Thread.holdsLock(e) : e; if (e.isInternal()) return; @@ -374,6 +380,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param topVer Topology version. + */ + public void beforeExchange(AffinityTopologyVersion topVer) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.flushBackupQueue(cctx.kernalContext()); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.flushBackupQueue(cctx.kernalContext()); + } + + /** * @param locLsnr Local listener. * @param rmtFilter Remote filter. * @param bufSize Buffer size. @@ -493,7 +510,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { GridCacheEntryEx e = it.next(); CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( - cctx.cacheId(), CREATED, e.key(), e.rawGet(), null, 0, 0); + cctx.cacheId(), + CREATED, + e.key(), + e.rawGet(), + null, + 0, 0); next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java index fe50fd8..4c7f8e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java @@ -49,6 +49,7 @@ public class GridContinuousMessage implements Message { private Object data; /** */ + @GridToStringInclude @GridDirectCollection(Message.class) private Collection<Message> msgs; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 457f150..7e71b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -593,6 +593,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId ID of the node that started routine. * @param routineId Routine ID. + * @param objs Notification objects. + * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + public void addBackupNotification(UUID nodeId, + final UUID routineId, + Collection<?> objs, + @Nullable Object orderedTopic) + throws IgniteCheckedException { + if (processorStopped) + return; + + final RemoteRoutineInfo info = rmtInfos.get(routineId); + + if (info != null) { + final GridContinuousBatch batch = info.addAll(objs); + + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null); + } + } + + /** + * @param nodeId ID of the node that started routine. + * @param routineId Routine ID. * @param obj Notification object. * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. * @param sync If {@code true} then waits for event acknowledgment. @@ -642,7 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { final GridContinuousBatch batch = info.add(obj); if (batch != null) { - CI1<IgniteException> ackClosure = new CI1<IgniteException>() { + CI1<IgniteException> ackC = new CI1<IgniteException>() { @Override public void apply(IgniteException e) { if (e == null) { try { @@ -655,7 +679,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }; - sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackClosure); + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC); } } } @@ -904,7 +928,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }; - sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg, + sendNotification(nodeId, + routineId, + null, + toSnd, + hnd.orderedTopic(), + msg, ackClosure); } catch (ClusterTopologyCheckedException ignored) { @@ -1212,6 +1241,36 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param objs Objects to add. + * @return Batch to send. + */ + GridContinuousBatch addAll(Collection<?> objs) { + assert objs != null; + assert objs.size() > 0; + + GridContinuousBatch toSnd = null; + + lock.writeLock().lock(); + + try { + for (Object obj : objs) + batch.add(obj); + + toSnd = batch; + + batch = hnd.createBatch(); + + if (interval > 0) + lastSndTime = U.currentTimeMillis(); + } + finally { + lock.writeLock().unlock(); + } + + return toSnd; + } + + /** * @param obj Object to add. * @return Batch to send or {@code null} if there is nothing to send for now. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java new file mode 100644 index 0000000..fe3c817 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.event.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(cacheMode()); + ccfg.setAtomicityMode(atomicityMode()); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @return Cache mode. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Atomicity mode. + */ + protected abstract CacheAtomicityMode atomicityMode(); + + /** + * @throws Exception If failed. + */ + public void testBackupQueue() throws Exception { + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setAutoUnsubscribe(true); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = qryClientCache.query(qry); + + int PARTS = 1; + + for (int i = 0; i < SRV_NODES - 1; i++) { + log.info("Stop iteration: " + i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + + Ignite ignite = ignite(i); + + IgniteCache<Object, Object> cache = ignite.cache(null); + + List<Integer> keys = testKeys(cache, PARTS); + + lsnr.latch = new CountDownLatch(keys.size()); + + boolean first = true; + + for (Integer key : keys) { + log.info("Put [node=" + ignite.name() + ", key=" + key + ']'); + + cache.put(key, key); + + if (first) { + spi.skipMsg = true; + + first = false; + } + } + + stopGrid(i); + + assertTrue("Failed to wait for notifications", lsnr.latch.await(5, SECONDS)); + + lsnr.latch = null; + + awaitPartitionMapExchange(); + } + + for (int i = 0; i < SRV_NODES - 1; i++) { + log.info("Start iteration: " + i); + + Ignite ignite = startGrid(i); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> cache = ignite.cache(null); + + List<Integer> keys = testKeys(cache, PARTS); + + lsnr.latch = new CountDownLatch(keys.size()); + + for (Integer key : keys) { + log.info("Put [node=" + ignite.name() + ", key=" + key + ']'); + + cache.put(key, key); + } + + if (!lsnr.latch.await(5, SECONDS)) + fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); + + lsnr.latch = null; + } + + cur.close(); + } + + /** + * @param cache Cache. + * @param parts Number of partitions. + * @return Keys. + */ + private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) { + Ignite ignite = cache.unwrap(Ignite.class); + + List<Integer> res = new ArrayList<>(); + + Affinity<Object> aff = ignite.affinity(cache.getName()); + + ClusterNode node = ignite.cluster().localNode(); + + int[] nodeParts = aff.primaryPartitions(node); + + final int KEYS_PER_PART = 1; + + for (int i = 0; i < parts; i++) { + int part = nodeParts[i]; + + int cnt = 0; + + for (int key = 0; key < 100_000; key++) { + if (aff.partition(key) == part && aff.isPrimary(node, key)) { + res.add(key); + + if (++cnt == KEYS_PER_PART) + break; + } + } + + assertEquals(KEYS_PER_PART, cnt); + } + + assertEquals(parts * KEYS_PER_PART, res.size()); + + return res; + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + ignite.log().info("Received cache event: " + evt); + + CountDownLatch latch = this.latch; + + assertTrue(latch != null); + assertTrue(latch.getCount() > 0); + + latch.countDown(); + } + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private volatile boolean skipMsg; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + if (skipMsg && msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridContinuousMessage) { + log.info("Skip continuous message: " + msg0); + + return; + } + } + + super.sendMessage(node, msg, ackClosure); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java new file mode 100644 index 0000000..8b38b7a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } +}