Repository: incubator-ignite Updated Branches: refs/heads/ignite-426 [created] 9fe0a8d56
IGNITE-426 - Continous queries failover Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9fe0a8d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9fe0a8d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9fe0a8d5 Branch: refs/heads/ignite-426 Commit: 9fe0a8d562084e15c77e6023c5162c1741b7bc93 Parents: 19fb305 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Aug 10 18:05:58 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Aug 10 18:05:58 2015 -0700 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 14 +- .../internal/GridMessageListenHandler.java | 10 ++ .../communication/GridIoMessageFactory.java | 7 +- .../processors/cache/GridCacheMapEntry.java | 18 +-- .../distributed/dht/GridDhtLocalPartition.java | 10 ++ .../CacheContinuousQueryBatchAck.java | 151 +++++++++++++++++++ .../continuous/CacheContinuousQueryEntry.java | 58 ++++++- .../continuous/CacheContinuousQueryHandler.java | 96 ++++++++++-- .../CacheContinuousQueryListener.java | 9 ++ .../continuous/CacheContinuousQueryManager.java | 40 ++++- .../continuous/GridContinuousQueryBatch.java | 47 ++++++ .../continuous/GridContinuousBatch.java | 44 ++++++ .../continuous/GridContinuousBatchAdapter.java | 47 ++++++ .../continuous/GridContinuousHandler.java | 18 +++ .../continuous/GridContinuousProcessor.java | 123 +++++++++------ 15 files changed, 615 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index f33fa39..0867c6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -197,8 +197,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { } } - ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, - false); + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, + false, false); } catch (ClusterTopologyCheckedException ignored) { // No-op. @@ -361,6 +361,16 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 4bfb57b..e1ba4e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -162,6 +162,16 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 7fe8da8..6103a46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -600,7 +600,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..112] - this + case 113: + msg = new CacheContinuousQueryBatchAck(); + + break; + + // [-3..113] - this // [120..123] - DR // [-4..-22] - SQL default: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f85a18b..8a34441 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1061,8 +1061,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, topVer); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1219,8 +1219,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, topVer); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1557,7 +1557,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, val, old, true, false, AffinityTopologyVersion.NONE); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2167,8 +2168,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, topVer); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2984,8 +2985,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme drReplicate(drType, val, ver); if (!skipQryNtf) { - if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) - cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload); + cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, topVer); cctx.dataStructures().onEntryUpdated(key, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 87c7f0e..d5d65b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -95,6 +95,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** Group reservations. */ private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); + /** Continuous query update index. */ + private final AtomicLong contQueryUpdIdx = new AtomicLong(); + /** * @param cctx Context. * @param id Partition ID. @@ -579,6 +582,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** + * @return Next update index. + */ + public long nextContinuousQueryUpdateIndex() { + return contQueryUpdIdx.incrementAndGet(); + } + + /** * Clears values for this partition. */ private void clearAll() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java new file mode 100644 index 0000000..49c1a3f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; +import java.util.*; + +/** + * Batch acknowledgement. + */ +public class CacheContinuousQueryBatchAck extends GridCacheMessage { + /** Routine ID. */ + private UUID routineId; + + /** Update indexes. */ + @GridToStringInclude + @GridDirectMap(keyType = Integer.class, valueType = Long.class) + private Map<Integer, Long> updateIdxs; + + /** + * Default constructor. + */ + public CacheContinuousQueryBatchAck() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param routineId Routine ID. + * @param updateIdxs Update indexes. + */ + CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateIdxs) { + this.cacheId = cacheId; + this.routineId = routineId; + this.updateIdxs = updateIdxs; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Update indexes. + */ + Map<Integer, Long> updateIndexes() { + return updateIdxs; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 113; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryBatchAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 060afb9..956a99b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -70,6 +70,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridDirectTransient private GridDeploymentInfo depInfo; + /** Partition. */ + private int part; + + /** Update index. */ + private long updateIdx; + /** * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. */ @@ -83,18 +89,24 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param part Partition. + * @param updateIdx Update index. */ CacheContinuousQueryEntry( int cacheId, EventType evtType, KeyCacheObject key, @Nullable CacheObject newVal, - @Nullable CacheObject oldVal) { + @Nullable CacheObject oldVal, + int part, + long updateIdx) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; this.newVal = newVal; this.oldVal = oldVal; + this.part = part; + this.updateIdx = updateIdx; } /** @@ -112,6 +124,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Partition. + */ + int partition() { + return part; + } + + /** + * @return Update index. + */ + long updateIndex() { + return updateIdx; + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -220,6 +246,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); + case 5: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeLong("updateIdx", updateIdx)) + return false; + + writer.incrementState(); + } return true; @@ -277,6 +315,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 5: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + updateIdx = reader.readLong("updateIdx"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return true; @@ -284,7 +338,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 879c30c..88ae39b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -23,13 +23,16 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; +import org.jsr166.*; import javax.cache.event.*; import javax.cache.event.EventType; @@ -81,6 +84,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; + /** Backup queue. */ + private transient Queue<CacheContinuousQueryEntry> backupQueue; + /** * Required by {@link Externalizable}. */ @@ -164,6 +170,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (rmtFilter != null) ctx.resource().injectGeneric(rmtFilter); + backupQueue = new ConcurrentLinkedDeque8<>(); + final boolean loc = nodeId.equals(ctx.localNodeId()); CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @@ -212,20 +220,24 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); else { try { - ClusterNode node = ctx.discovery().node(nodeId); + final CacheContinuousQueryEntry entry = evt.entry(); + + if (primary) { + if (ctx.config().isPeerClassLoadingEnabled() && ctx.discovery().node(nodeId) != null) { + entry.prepareMarshal(cctx); - if (ctx.config().isPeerClassLoadingEnabled() && node != null) { - evt.entry().prepareMarshal(cctx); + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + depMgr.prepare(entry); + } + else + entry.prepareMarshal(cctx); - depMgr.prepare(evt.entry()); + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } else - evt.entry().prepareMarshal(cctx); - - ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true); + backupQueue.add(entry); } catch (ClusterTopologyCheckedException ex) { IgniteLogger log = ctx.log(getClass()); @@ -267,6 +279,31 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister(); } + @Override public void cleanupBackupQueue(Map<Integer, Long> updateIdxs) { + Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + assert backupEntry != null; + + Long updateIdx = updateIdxs.get(backupEntry.partition()); + + if (updateIdx != null) { + assert backupEntry.updateIndex() <= updateIdx; + + it.remove(); + + if (backupEntry.updateIndex() == updateIdx) { + updateIdxs.remove(backupEntry.partition()); + + if (updateIdxs.isEmpty()) + break; + } + } + } + } + @Override public boolean oldValueRequired() { return oldValRequired; } @@ -298,7 +335,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { assert routineId != null; assert ctx != null; - GridCacheAdapter<K, V> cache = ctx.cache().<K, V>internalCache(cacheName); + GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); if (cache != null) cache.context().continuousQueries().unregisterListener(internal, routineId); @@ -381,6 +418,43 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousQueryBatch(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) + throws IgniteCheckedException { + GridContinuousQueryBatch qryBatch = (GridContinuousQueryBatch)batch; + + GridCacheContext<K, V> cctx = cacheContext(ctx); + + Collection<ClusterNode> nodes = new HashSet<>(); + + cctx.topology().readLock(); + + try { + AffinityTopologyVersion topVer = cctx.topology().topologyVersion(); + + for (Integer part : qryBatch.updateIndexes().keySet()) { + for (ClusterNode node : cctx.dht().topology().nodes(part, topVer)) { + if (!node.equals(cctx.localNode())) + nodes.add(node); + } + } + } + finally { + cctx.topology().readUnlock(); + } + + CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), routineId, + qryBatch.updateIndexes()); + + for (ClusterNode node : nodes) + cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return topic; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index a21213f..d5d5ff8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.*; + /** * Continuous query listener. */ @@ -41,6 +43,13 @@ interface CacheContinuousQueryListener<K, V> { public void onUnregister(); /** + * Cleans backup queue. + * + * @param updateIdxs Update indexes map. + */ + public void cleanupBackupQueue(Map<Integer, Long> updateIdxs); + + /** * @return Whether old value is required. */ public boolean oldValueRequired(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 6277c5d..c6a16c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -83,6 +84,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { @Override protected void start0() throws IgniteCheckedException { // Append cache name to the topic. topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + + cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + new CI2<UUID, CacheContinuousQueryBatchAck>() { + @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { + CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); + + if (lsnr != null) + lsnr.cleanupBackupQueue(msg.updateIndexes()); + } + }); } /** {@inheritDoc} */ @@ -123,12 +134,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { KeyCacheObject key, CacheObject newVal, CacheObject oldVal, - boolean preload) + boolean primary, + boolean preload, + AffinityTopologyVersion topVer) throws IgniteCheckedException { assert e != null; assert key != null; + assert Thread.holdsLock(e); + boolean internal = e.isInternal() || !e.context().userCache(); if (preload && !internal) @@ -150,11 +165,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (!hasNewVal && !hasOldVal) return; + GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false); + + assert locPart != null; + + long updateIdx = locPart.nextContinuousQueryUpdateIndex(); + EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; boolean initialized = false; - boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE); boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { @@ -180,7 +200,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { evtType, key, newVal, - lsnr.oldValueRequired() ? oldVal : null); + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + updateIdx); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -200,6 +222,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { assert e != null; assert key != null; + assert Thread.holdsLock(e); + if (e.isInternal()) return; @@ -230,7 +254,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { EXPIRED, key, null, - lsnr.oldValueRequired() ? oldVal : null); + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + 0); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -466,10 +492,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { GridCacheEntryEx e = it.next(); + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + cctx.cacheId(), CREATED, e.key(), e.rawGet(), null, 0, 0); + next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), - cctx, - new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null)); + cctx, entry); if (rmtFilter != null && !rmtFilter.evaluate(next)) next = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java new file mode 100644 index 0000000..9e47d1d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.internal.processors.continuous.*; +import org.jsr166.*; + +import java.util.*; + +/** + * Continuous query batch. + */ +class GridContinuousQueryBatch extends GridContinuousBatchAdapter { + /** Update indexes. */ + private final Map<Integer, Long> updateIdxs = new ConcurrentHashMap8<>(); + + /** + * @return Update indexes. + */ + Map<Integer, Long> updateIndexes() { + return updateIdxs; + } + + /** {@inheritDoc} */ + @Override public void add(Object obj) { + super.add(obj); + + CacheContinuousQueryEntry entry = (CacheContinuousQueryEntry)obj; + + updateIdxs.put(entry.partition(), entry.updateIndex()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java new file mode 100644 index 0000000..ded7e87 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.*; + +/** + * Continuous routine batch. + */ +public interface GridContinuousBatch { + /** + * Adds element to this batch. + * + * @param obj Element to add. + */ + public void add(Object obj); + + /** + * Collects elements that are currently in this batch. + * + * @return Elements in this batch. + */ + public Collection<Object> collect(); + + /** + * @return Current batch size. + */ + public int size(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java new file mode 100644 index 0000000..ef4c069 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.jsr166.*; + +import java.util.*; + +/** + * Continuous routine batch adapter. + */ +public class GridContinuousBatchAdapter implements GridContinuousBatch { + /** Buffer. */ + private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); + + /** {@inheritDoc} */ + @Override public void add(Object obj) { + assert obj != null; + + buf.add(obj); + } + + /** {@inheritDoc} */ + @Override public Collection<Object> collect() { + return buf; + } + + /** {@inheritDoc} */ + @Override public int size() { + return buf.sizex(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 79020da..72aaf0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -98,6 +98,24 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException; /** + * Creates new batch. + * + * @return New batch. + */ + public GridContinuousBatch createBatch(); + + /** + * Called when ack for a batch is received from client. + * + * @param routineId Routine ID. + * @param batch Acknowledged batch. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) + throws IgniteCheckedException; + + /** * @return Topic for ordered notifications. If {@code null}, notifications * will be sent in non-ordered messages. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index daa9494..457f150 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -600,8 +600,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ public void addNotification(UUID nodeId, - UUID routineId, - @Nullable Object obj, + final UUID routineId, + Object obj, @Nullable Object orderedTopic, boolean sync, boolean msg) @@ -615,7 +615,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (processorStopped) return; - RemoteRoutineInfo info = rmtInfos.get(routineId); + final RemoteRoutineInfo info = rmtInfos.get(routineId); if (info != null) { assert info.interval == 0 || !sync; @@ -628,7 +628,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg); + sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null); } catch (IgniteCheckedException e) { syncMsgFuts.remove(futId); @@ -639,10 +639,24 @@ public class GridContinuousProcessor extends GridProcessorAdapter { fut.get(); } else { - Collection<Object> toSnd = info.add(obj); + final GridContinuousBatch batch = info.add(obj); + + if (batch != null) { + CI1<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) { + try { + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to acknowledge batch: " + batch, ex); + } + } + } + }; - if (toSnd != null) - sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg); + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackClosure); + } } } } @@ -667,6 +681,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. * @param msg If {@code true} then sent data is collection of messages. + * @param ackClosure Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendNotification(UUID nodeId, @@ -674,7 +689,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Nullable IgniteUuid futId, Collection<Object> toSnd, @Nullable Object orderedTopic, - boolean msg) throws IgniteCheckedException { + boolean msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; assert toSnd != null; @@ -682,7 +698,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), - orderedTopic); + orderedTopic, + ackClosure); } /** @@ -800,6 +817,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false), + null, null); } catch (IgniteCheckedException e) { @@ -863,15 +881,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter { break; } - IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval(); + IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval(); - Collection<Object> toSnd = t.get1(); + final GridContinuousBatch batch = t.get1(); - if (toSnd != null && !toSnd.isEmpty()) { + if (batch != null && batch.size() > 0) { try { + Collection<Object> toSnd = batch.collect(); + boolean msg = toSnd.iterator().next() instanceof Message; - sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg); + CI1<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) { + try { + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to acknowledge batch: " + batch, ex); + } + } + } + }; + + sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg, + ackClosure); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) @@ -949,9 +983,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackClosure Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic) + private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { assert nodeId != null; assert msg != null; @@ -959,7 +995,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ClusterNode node = ctx.discovery().node(nodeId); if (node != null) - sendWithRetries(node, msg, orderedTopic); + sendWithRetries(node, msg, orderedTopic, ackClosure); else throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); } @@ -969,14 +1005,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackClosure Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic) - throws IgniteCheckedException { + private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { assert node != null; assert msg != null; - sendWithRetries(F.asList(node), msg, orderedTopic); + sendWithRetries(F.asList(node), msg, orderedTopic, ackClosure); } /** @@ -984,10 +1021,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackClosure Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, - @Nullable Object orderedTopic) throws IgniteCheckedException { + @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { assert !F.isEmpty(nodes); assert msg != null; @@ -1013,7 +1051,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { true); } else - ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackClosure); break; } @@ -1114,8 +1152,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); - /** Buffer. */ - private ConcurrentLinkedDeque8<Object> buf; + /** Batch. */ + private GridContinuousBatch batch; /** Last send time. */ private long lastSndTime = U.currentTimeMillis(); @@ -1146,7 +1184,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { this.interval = interval; this.autoUnsubscribe = autoUnsubscribe; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); } /** @@ -1175,20 +1213,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param obj Object to add. - * @return Object to send or {@code null} if there is nothing to send for now. + * @return Batch to send or {@code null} if there is nothing to send for now. */ - @Nullable Collection<Object> add(@Nullable Object obj) { - ConcurrentLinkedDeque8 buf0 = null; + @Nullable GridContinuousBatch add(Object obj) { + assert obj != null; + + GridContinuousBatch toSnd = null; - if (buf.sizex() >= bufSize - 1) { + if (batch.size() >= bufSize - 1) { lock.writeLock().lock(); try { - buf.add(obj); + batch.add(obj); - buf0 = buf; + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); if (interval > 0) lastSndTime = U.currentTimeMillis(); @@ -1201,34 +1241,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter { lock.readLock().lock(); try { - buf.add(obj); + batch.add(obj); } finally { lock.readLock().unlock(); } } - Collection<Object> toSnd = null; - - if (buf0 != null) { - toSnd = new ArrayList<>(buf0.sizex()); - - for (Object o : buf0) - toSnd.add(o); - } - return toSnd; } /** - * @return Tuple with objects to sleep (or {@code null} if there is nothing to + * @return Tuple with batch to send (or {@code null} if there is nothing to * send for now) and time interval after next check is needed. */ @SuppressWarnings("TooBroadScope") - IgniteBiTuple<Collection<Object>, Long> checkInterval() { + IgniteBiTuple<GridContinuousBatch, Long> checkInterval() { assert interval > 0; - Collection<Object> toSnd = null; + GridContinuousBatch toSnd = null; long diff; long now = U.currentTimeMillis(); @@ -1238,10 +1269,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { diff = now - lastSndTime; - if (diff >= interval && !buf.isEmpty()) { - toSnd = buf; + if (diff >= interval && batch.size() > 0) { + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); lastSndTime = now; }