http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e71dd65..81184a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** Flag indicating whether cache operation requires a previous value. */ private boolean retVal; + /** {@code True} if first lock request for lock operation sent from client node. */ + private boolean firstClientReq; + /** * Empty constructor required for {@link Externalizable}. */ @@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param implicitTx Flag to indicate that transaction is implicit. * @param implicitSingleTx Implicit-transaction-with-one-key flag. * @param isRead Indicates whether implicit lock is for read or write operation. + * @param retVal Return value flag. * @param isolation Transaction isolation. * @param isInvalidate Invalidation flag. * @param timeout Lock timeout. @@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param firstClientReq {@code True} if first lock request for lock operation sent from client node. */ public GridNearLockRequest( int cacheId, @@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest { @Nullable UUID subjId, int taskNameHash, long accessTtl, - boolean skipStore + boolean skipStore, + boolean firstClientReq ) { super( cacheId, @@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest { this.taskNameHash = taskNameHash; this.accessTtl = accessTtl; this.retVal = retVal; + this.firstClientReq = firstClientReq; dhtVers = new GridCacheVersion[keyCnt]; } /** + * @return {@code True} if first lock request for lock operation sent from client node. + */ + public boolean firstClientRequest() { + return firstClientReq; + } + + /** * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { @@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 24: - if (!writer.writeBoolean("hasTransforms", hasTransforms)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) + if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("implicitTx", implicitTx)) + if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; writer.incrementState(); case 27: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeBoolean("implicitTx", implicitTx)) return false; writer.incrementState(); case 28: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); case 30: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 31: - if (!writer.writeBoolean("syncCommit", syncCommit)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); case 33: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 24: - hasTransforms = reader.readBoolean("hasTransforms"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 25: - implicitSingleTx = reader.readBoolean("implicitSingleTx"); + hasTransforms = reader.readBoolean("hasTransforms"); if (!reader.isLastRead()) return false; @@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 26: - implicitTx = reader.readBoolean("implicitTx"); + implicitSingleTx = reader.readBoolean("implicitSingleTx"); if (!reader.isLastRead()) return false; @@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 27: - miniId = reader.readIgniteUuid("miniId"); + implicitTx = reader.readBoolean("implicitTx"); if (!reader.isLastRead()) return false; @@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 28: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 29: - retVal = reader.readBoolean("retVal"); + onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) return false; @@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 30: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 31: - syncCommit = reader.readBoolean("syncCommit"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) return false; @@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 33: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java index 20928de..f324198 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** Filter evaluation results for fast-commit transactions. */ private boolean[] filterRes; + /** {@code True} if client node should remap lock request. */ + private AffinityTopologyVersion clientRemapVer; + /** * Empty constructor (required by {@link Externalizable}). */ @@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { * @param filterRes {@code True} if need to allocate array for filter evaluation results. * @param cnt Count. * @param err Error. + * @param clientRemapVer {@code True} if client node should remap lock request. */ public GridNearLockResponse( int cacheId, @@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse { IgniteUuid miniId, boolean filterRes, int cnt, - Throwable err + Throwable err, + AffinityTopologyVersion clientRemapVer ) { super(cacheId, lockVer, futId, cnt, err); assert miniId != null; this.miniId = miniId; + this.clientRemapVer = clientRemapVer; dhtVers = new GridCacheVersion[cnt]; mappedVers = new GridCacheVersion[cnt]; @@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse { } /** + * @return {@code True} if client node should remap lock request. + */ + @Nullable public AffinityTopologyVersion clientRemapVersion() { + return clientRemapVer; + } + + /** * Gets pending versions that are less than {@link #version()}. * * @return Pending versions. @@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (writer.state()) { case 11: - if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("clientRemapVer", clientRemapVer)) return false; writer.incrementState(); case 12: - if (!writer.writeBooleanArray("filterRes", filterRes)) + if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) + if (!writer.writeBooleanArray("filterRes", filterRes)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: + if (!writer.writeIgniteUuid("miniId", miniId)) + return false; + + writer.incrementState(); + + case 16: if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; @@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (reader.state()) { case 11: - dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + clientRemapVer = reader.readMessage("clientRemapVer"); if (!reader.isLastRead()) return false; @@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 12: - filterRes = reader.readBooleanArray("filterRes"); + dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 13: - mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + filterRes = reader.readBooleanArray("filterRes"); if (!reader.isLastRead()) return false; @@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 15: + miniId = reader.readIgniteUuid("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 4f74303..44b7997 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -221,18 +221,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd if (topVer != null) { tx.topologyVersion(topVer); - prepare0(); + prepare0(false); return; } - prepareOnTopology(); + prepareOnTopology(false, null); } /** - * + * @param remap Remap flag. + * @param c Optional closure to run after map. */ - private void prepareOnTopology() { + private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { GridDhtTopologyFuture topFut = topologyReadLock(); try { @@ -265,16 +266,22 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return; } - tx.topologyVersion(topFut.topologyVersion()); + if (remap) + tx.onRemap(topFut.topologyVersion()); + else + tx.topologyVersion(topFut.topologyVersion()); + + prepare0(remap); - prepare0(); + if (c != null) + c.run(); } else { topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { - prepareOnTopology(); + prepareOnTopology(remap, c); } }); } @@ -346,10 +353,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * Initializes future. + * + * @param remap Remap flag. */ - private void prepare0() { + private void prepare0(boolean remap) { try { - if (!tx.state(PREPARING)) { + boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); + + if (!txStateCheck) { if (tx.setRollbackOnly()) { if (tx.timedOut()) onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + @@ -366,7 +377,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); + if (!remap) + cctx.mvcc().addFuture(this); prepare( tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), @@ -502,7 +514,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd tx.implicitSingle(), m.explicitLock(), tx.subjectId(), - tx.taskNameHash()); + tx.taskNameHash(), + m.clientFirst()); for (IgniteTxEntry txEntry : m.writes()) { if (txEntry.op() == TRANSFORM) @@ -560,13 +573,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param entry Transaction entry. * @param topVer Topology version. * @param cur Current mapping. + * @param waitLock Wait lock flag. * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. * @return Mapping. */ private GridDistributedTxMapping map( IgniteTxEntry entry, AffinityTopologyVersion topVer, - GridDistributedTxMapping cur, + @Nullable GridDistributedTxMapping cur, boolean waitLock ) throws IgniteCheckedException { GridCacheContext cacheCtx = entry.context(); @@ -599,10 +613,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + boolean clientFirst = cur == null && cctx.kernalContext().clientNode(); + cur = new GridDistributedTxMapping(primary); // Initialize near flag right away. cur.near(cacheCtx.isNear()); + + cur.clientFirst(clientFirst); } cur.add(entry); @@ -748,18 +766,47 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd onError(nodeId, mappings, res.error()); } else { - onPrepareResponse(m, res); + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + assert m.clientFirst(); + + IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + onPrepareResponse(m, res); - // Proceed prepare before finishing mini future. - if (mappings != null) - proceedPrepare(mappings); + // Proceed prepare before finishing mini future. + if (mappings != null) + proceedPrepare(mappings); - // Finish this mini future. - onDone(tx); + // Finish this mini future. + onDone(tx); + } } } } + /** + * + */ + private void remap() { + prepareOnTopology(true, new Runnable() { + @Override public void run() { + onDone(tx); + } + }); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index bce62c1..7006114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { + assert res.clientRemapVersion() == null : res; + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { MiniFuture f = (MiniFuture)fut; @@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.implicitSingle(), m.explicitLock(), tx.subjectId(), - tx.taskNameHash()); + tx.taskNameHash(), + false); for (IgniteTxEntry txEntry : m.writes()) { if (txEntry.op() == TRANSFORM) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index df7a65f..696acfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -403,7 +403,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> assert nodeId != null; assert res != null; - GridNearLockFuture<K, V> fut = (GridNearLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), + GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); if (fut != null) @@ -423,7 +423,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); - GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, + GridNearLockFuture fut = new GridNearLockFuture(ctx, keys, (GridNearTxLocal)tx, isRead, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c38965d..fa8877a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -56,8 +56,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { private static final long serialVersionUID = 0L; /** DHT mappings. */ - private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = - new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>(); /** Future. */ @GridToStringExclude @@ -65,13 +64,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** */ @GridToStringExclude - private final AtomicReference<GridNearTxFinishFuture> commitFut = - new AtomicReference<>(); + private final AtomicReference<GridNearTxFinishFuture> commitFut = new AtomicReference<>(); /** */ @GridToStringExclude - private final AtomicReference<GridNearTxFinishFuture> rollbackFut = - new AtomicReference<>(); + private final AtomicReference<GridNearTxFinishFuture> rollbackFut = new AtomicReference<>(); /** Entries to lock on next step of prepare stage. */ private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList(); @@ -85,6 +82,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** Info for entries accessed locally in optimistic transaction. */ private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap; + /** */ + private boolean hasRemoteLocks; + /** * Empty constructor required for {@link Externalizable}. */ @@ -97,6 +97,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @param implicit Implicit flag. * @param implicitSingle Implicit with one key flag. * @param sys System flag. + * @param plc IO policy. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -1185,6 +1186,36 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + assert cctx.kernalContext().clientNode(); + + mapped.set(false); + nearLocallyMapped = false; + colocatedLocallyMapped = false; + txNodes = null; + onePhaseCommit = false; + nearMap.clear(); + dhtMap.clear(); + mappings.clear(); + + this.topVer.set(topVer); + } + + /** + * @param hasRemoteLocks {@code True} if tx has remote locks acquired. + */ + public void hasRemoteLocks(boolean hasRemoteLocks) { + this.hasRemoteLocks = hasRemoteLocks; + } + + /** + * @return {@code True} if tx has remote locks acquired. + */ + public boolean hasRemoteLocks() { + return hasRemoteLocks; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index a08637d..b602a7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Task name hash. */ private int taskNameHash; + /** {@code True} if first optimistic tx prepare request sent from client node. */ + private boolean firstClientReq; + /** * Empty constructor required for {@link Externalizable}. */ @@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. + * @param onePhaseCommit One phase commit flag. + * @param retVal Return value flag. + * @param implicitSingle Implicit single flag. + * @param explicitLock Explicit lock flag. * @param subjId Subject ID. * @param taskNameHash Task name hash. + * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node. */ public GridNearTxPrepareRequest( IgniteUuid futId, @@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean implicitSingle, boolean explicitLock, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + boolean firstClientReq ) { super(tx, reads, writes, txNodes, onePhaseCommit); assert futId != null; + assert !firstClientReq || tx.optimistic() : tx; this.futId = futId; this.topVer = topVer; @@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { this.explicitLock = explicitLock; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.firstClientReq = firstClientReq; + } + + /** + * @return {@code True} if first optimistic tx prepare request sent from client node. + */ + public boolean firstClientRequest() { + return firstClientReq; } /** @@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 24: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingle", implicitSingle)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("last", last)) + if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); case 27: - if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) + if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); case 28: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("near", near)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 30: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); case 31: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 33: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 24: - futId = reader.readIgniteUuid("futId"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 25: - implicitSingle = reader.readBoolean("implicitSingle"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 26: - last = reader.readBoolean("last"); + implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) return false; @@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 27: - lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); + last = reader.readBoolean("last"); if (!reader.isLastRead()) return false; @@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 28: - miniId = reader.readIgniteUuid("miniId"); + lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - near = reader.readBoolean("near"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 30: - retVal = reader.readBoolean("retVal"); + near = reader.readBoolean("near"); if (!reader.isLastRead()) return false; @@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 33: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index f8c07f7..0f0b2c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -27,6 +28,7 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -83,6 +85,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @GridDirectCollection(IgniteTxKey.class) private Collection<IgniteTxKey> filterFailedKeys; + /** Not {@code null} if client node should remap transaction. */ + private AffinityTopologyVersion clientRemapVer; + /** * Empty constructor required by {@link Externalizable}. */ @@ -95,9 +100,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse * @param futId Future ID. * @param miniId Mini future ID. * @param dhtVer DHT version. + * @param writeVer Write version. * @param invalidParts Invalid partitions. * @param retVal Return value. * @param err Error. + * @param clientRemapVer Not {@code null} if client node should remap transaction. */ public GridNearTxPrepareResponse( GridCacheVersion xid, @@ -107,7 +114,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse GridCacheVersion writeVer, Collection<Integer> invalidParts, GridCacheReturn retVal, - Throwable err + Throwable err, + AffinityTopologyVersion clientRemapVer ) { super(xid, err); @@ -121,6 +129,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse this.writeVer = writeVer; this.invalidParts = invalidParts; this.retVal = retVal; + this.clientRemapVer = clientRemapVer; + } + + /** + * @return {@code True} if client node should remap transaction. + */ + @Nullable public AffinityTopologyVersion clientRemapVersion() { + return clientRemapVer; } /** @@ -330,60 +346,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse switch (writer.state()) { case 10: - if (!writer.writeMessage("dhtVer", dhtVer)) + if (!writer.writeMessage("clientRemapVer", clientRemapVer)) return false; writer.incrementState(); case 11: - if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("dhtVer", dhtVer)) return false; writer.incrementState(); case 12: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeMessage("retVal", retVal)) + if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 19: + if (!writer.writeMessage("retVal", retVal)) + return false; + + writer.incrementState(); + + case 20: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -406,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse switch (reader.state()) { case 10: - dhtVer = reader.readMessage("dhtVer"); + clientRemapVer = reader.readMessage("clientRemapVer"); if (!reader.isLastRead()) return false; @@ -414,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 11: - filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG); + dhtVer = reader.readMessage("dhtVer"); if (!reader.isLastRead()) return false; @@ -422,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 12: - futId = reader.readIgniteUuid("futId"); + filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -430,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 13: - invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -438,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -446,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 15: - ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -454,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 16: - ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); + ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -462,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 17: - pending = reader.readCollection("pending", MessageCollectionItemType.MSG); + ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -470,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 18: - retVal = reader.readMessage("retVal"); + pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -478,6 +500,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 19: + retVal = reader.readMessage("retVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -497,7 +527,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 20; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 6120e25..4adcff5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -41,7 +41,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { private static final long serialVersionUID = 0L; /** */ - private GridCachePreloader<K,V> preldr; + private GridCachePreloader preldr; /** * Empty constructor required by {@link Externalizable}. @@ -56,7 +56,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { public GridLocalCache(GridCacheContext<K, V> ctx) { super(ctx, ctx.config().getStartSize()); - preldr = new GridCachePreloaderAdapter<>(ctx); + preldr = new GridCachePreloaderAdapter(ctx); } /** {@inheritDoc} */ @@ -65,7 +65,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { + @Override public GridCachePreloader preloader() { return preldr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 819b0f0..bcbdec4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -53,7 +53,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe(); /** */ - private GridCachePreloader<K,V> preldr; + private GridCachePreloader preldr; /** * Empty constructor required by {@link Externalizable}. @@ -68,7 +68,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { public GridLocalAtomicCache(GridCacheContext<K, V> ctx) { super(ctx, ctx.config().getStartSize()); - preldr = new GridCachePreloaderAdapter<>(ctx); + preldr = new GridCachePreloaderAdapter(ctx); } /** {@inheritDoc} */ @@ -94,7 +94,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { + @Override public GridCachePreloader preloader() { return preldr; } @@ -119,7 +119,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Override public boolean put(K key, V val, CacheEntryPredicate[] filter) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); - return (Boolean)updateAllInternal(UPDATE, + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + + boolean res = (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), null, @@ -129,6 +133,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { filter, ctx.writeThrough(), ctx.readThrough()); + + if (statsEnabled) + metrics0().addPutTimeNanos(System.nanoTime() - start); + + return res; } /** {@inheritDoc} */ @@ -268,6 +277,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException { + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + updateAllInternal(UPDATE, m.keySet(), m.values(), @@ -278,6 +291,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { CU.empty0(), ctx.writeThrough(), ctx.readThrough()); + + if (statsEnabled) + metrics0().addPutTimeNanos(System.nanoTime() - start); } /** {@inheritDoc} */ @@ -727,7 +743,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { final ExpiryPolicy expiry = expiryPerCall(); - return asyncOp(new Callable<Object>() { + IgniteInternalFuture fut = asyncOp(new Callable<Object>() { @Override public Object call() throws Exception { return updateAllInternal(op, keys, @@ -741,6 +757,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { readThrough); } }); + + if (ctx.config().isStatisticsEnabled()) + fut.listen(new UpdatePutTimeStatClosure(metrics0(), System.nanoTime())); + + return fut; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 4b1fc87..7f0a5ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -348,17 +348,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param duration Duration. */ public void onExecuted(Object res, Throwable err, long startTime, long duration) { - boolean fail = err != null; - - // Update own metrics. - metrics.onQueryExecute(duration, fail); - - // Update metrics in query manager. - cctx.queries().onMetricsUpdate(duration, fail); - - if (log.isDebugEnabled()) - log.debug("Query execution finished [qry=" + this + ", startTime=" + startTime + - ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']'); + GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log); } /** {@inheritDoc} */ @@ -376,10 +366,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return execute(null, rmtTransform, args); } + /** {@inheritDoc} */ @Override public QueryMetrics metrics() { return metrics.copy(); } + /** {@inheritDoc} */ @Override public void resetMetrics() { metrics = new GridCacheQueryMetricsAdapter(); } @@ -470,10 +462,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { assert cctx != null; - return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { + Collection<ClusterNode> affNodes = CU.affinityNodes(cctx); + + if (prj == null) + return affNodes; + + return F.view(affNodes, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return cctx.discovery().cacheAffinityNode(n, cctx.name()) && - (prj == null || prj.node(n.id()) != null); + return prj.node(n.id()) != null; } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java index 2999e7b..15eb368 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java @@ -43,6 +43,8 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection< /** {@inheritDoc} */ @Nullable @Override public T next() throws IgniteCheckedException { + get(); + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16a8028..32e9d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -773,7 +773,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); { advance(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 759a949..6277c5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -250,8 +250,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeQuery(CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter, - int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException { + public UUID executeQuery(CacheEntryUpdatedListener locLsnr, + CacheEntryEventSerializableFilter rmtFilter, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + ClusterGroup grp) throws IgniteCheckedException + { return executeQuery0( locLsnr, rmtFilter, @@ -301,7 +306,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { try { cctx.kernalContext().continuous().stopRoutine(routineId).get(); } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException | IgniteException e) { if (log.isDebugEnabled()) log.debug("Failed to stop internal continuous query: " + e.getMessage()); } @@ -357,9 +362,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, final CacheEntryEventSerializableFilter rmtFilter, - int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, - boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { + private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + final CacheEntryEventSerializableFilter rmtFilter, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + ClusterGroup grp) throws IgniteCheckedException + { cctx.checkSecurity(SecurityPermission.CACHE_READ); if (grp == null) @@ -745,7 +759,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - impl = (CacheEntryEventSerializableFilter)in.readObject(); + impl = (CacheEntryEventFilter)in.readObject(); types = in.readByte(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java index 5fde622..02fe679 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.store; -import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java index d9f50ac..a14df6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -68,6 +68,11 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { public boolean isWriteThrough(); /** + * @return {@code True} is write-behind is enabled. + */ + public boolean isWriteBehind(); + + /** * @return Whether DHT transaction can write to store from DHT. */ public boolean isWriteToStoreFromDht(); @@ -160,7 +165,7 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> { * @param commit Commit. * @throws IgniteCheckedException If failed. */ - public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException; + public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException; /** * End session initiated by write-behind store. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index f9a966c..b4a146a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -59,11 +59,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private ThreadLocal<SessionData> sesHolder; /** */ + private ThreadLocalSession locSes; + + /** */ private boolean locStore; /** */ private boolean writeThrough; + /** */ + private Collection<CacheStoreSessionListener> sesLsnrs; + + /** */ + private boolean globalSesLsnrs; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { @@ -84,14 +93,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore); if (sesHolder0 == null) { - ThreadLocalSession locSes = new ThreadLocalSession(); + sesHolder0 = new ThreadLocal<>(); - if (ctx.resource().injectStoreSession(cfgStore, locSes)) { - sesHolder0 = locSes.sesHolder; + locSes = new ThreadLocalSession(sesHolder0); + if (ctx.resource().injectStoreSession(cfgStore, locSes)) sesHolders.put(cfgStore, sesHolder0); - } } + else + locSes = new ThreadLocalSession(sesHolder0); } sesHolder = sesHolder0; @@ -148,6 +158,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException("Failed to start cache store: " + e, e); } } + + CacheConfiguration cfg = cctx.config(); + + if (cfgStore != null && !cfg.isWriteThrough() && !cfg.isReadThrough()) { + U.quietAndWarn(log, + "Persistence store is configured, but both read-through and write-through are disabled. This " + + "configuration makes sense if the store implements loadCache method only. If this is the " + + "case, ignore this warning. Otherwise, fix the configuration for cache: " + cfg.getName(), + "Persistence store is configured, but both read-through and write-through are disabled."); + } + + sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories()); + + if (sesLsnrs == null) { + sesLsnrs = cctx.shared().storeSessionListeners(); + + globalSesLsnrs = true; + } } /** {@inheritDoc} */ @@ -164,6 +192,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt U.error(log(), "Failed to stop cache store.", e); } } + + if (!globalSesLsnrs) { + try { + CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e); + } + } } /** {@inheritDoc} */ @@ -215,14 +252,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; Object val = null; try { val = singleThreadGate.load(storeKey); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -234,7 +271,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -264,8 +301,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public boolean isWriteBehind() { + return cctx.config().isWriteBehindEnabled(); + } + + /** {@inheritDoc} */ @Override public boolean isWriteToStoreFromDht() { - return cctx.config().isWriteBehindEnabled() || locStore; + return isWriteBehind() || locStore; } /** {@inheritDoc} */ @@ -349,7 +391,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { @@ -380,7 +422,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt else singleThreadGate.loadAll(keys0, c); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -392,7 +434,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -408,7 +450,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(null); - boolean thewEx = true; + boolean threwEx = true; try { store.loadCache(new IgniteBiInClosure<Object, Object>() { @@ -431,7 +473,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } }, args); - thewEx = false; + threwEx = false; } catch (CacheLoaderException e) { throw new IgniteCheckedException(e); @@ -440,7 +482,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(null, thewEx); + sessionEnd0(null, threwEx); } if (log.isDebugEnabled()) @@ -473,12 +515,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val)); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -490,7 +532,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -522,12 +564,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.writeAll(entries); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -548,7 +590,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -576,12 +618,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.delete(key); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -593,7 +635,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheWriterException(e)); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -606,8 +648,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ - @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) - throws IgniteCheckedException { + @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -625,12 +666,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sessionInit0(tx); - boolean thewEx = true; + boolean threwEx = true; try { store.deleteAll(keys0); - thewEx = false; + threwEx = false; } catch (ClassCastException e) { handleClassCastException(e); @@ -645,7 +686,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(e); } finally { - sessionEnd0(tx, thewEx); + sessionEnd0(tx, threwEx); } if (log.isDebugEnabled()) @@ -669,16 +710,27 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ - @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { + @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException { assert store != null; sessionInit0(tx); try { - store.sessionEnd(commit); + if (sesLsnrs != null) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionEnd(locSes, commit); + } + + if (!sesHolder.get().ended(store)) + store.sessionEnd(commit); + } + catch (Throwable e) { + last = true; + + throw e; } finally { - if (sesHolder != null) { + if (last && sesHolder != null) { sesHolder.set(null); tx.removeMeta(SES_ATTR); @@ -715,10 +767,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @param tx Current transaction. */ private void sessionInit0(@Nullable IgniteInternalTx tx) { - if (sesHolder == null) - return; - - assert sesHolder.get() == null; + assert sesHolder != null; SessionData ses; @@ -738,6 +787,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt ses = new SessionData(null, cctx.name()); sesHolder.set(ses); + + if (sesLsnrs != null && !ses.started(this)) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionStart(locSes); + } } /** @@ -745,8 +799,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { try { - if (tx == null) - store.sessionEnd(threwEx); + if (tx == null) { + if (sesLsnrs != null) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionEnd(locSes, !threwEx); + } + + assert !sesHolder.get().ended(store); + + store.sessionEnd(!threwEx); + } } catch (Exception e) { if (!threwEx) @@ -788,6 +850,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt @GridToStringInclude private Map<Object, Object> props; + /** */ + private Object attachment; + + /** */ + private final Set<CacheStoreManager> started = + new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>()); + + /** */ + private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>()); + /** * @param tx Current transaction. * @param cacheName Cache name. @@ -815,6 +887,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** + * @param attachment Attachment. + */ + private Object attach(Object attachment) { + Object prev = this.attachment; + + this.attachment = attachment; + + return prev; + } + + /** + * @return Attachment. + */ + private Object attachment() { + return attachment; + } + + /** * @return Cache name. */ private String cacheName() { @@ -828,6 +918,21 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt this.cacheName = cacheName; } + /** + * @return If session is started. + */ + private boolean started(CacheStoreManager mgr) { + return !started.add(mgr); + } + + /** + * @param store Cache store. + * @return Whether session already ended on this store instance. + */ + private boolean ended(CacheStore store) { + return !ended.add(store); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SessionData.class, this, "tx", CU.txString(tx)); @@ -839,7 +944,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private static class ThreadLocalSession implements CacheStoreSession { /** */ - private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>(); + private final ThreadLocal<SessionData> sesHolder; + + /** + * @param sesHolder Session holder. + */ + private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) { + this.sesHolder = sesHolder; + } /** {@inheritDoc} */ @Nullable @Override public Transaction transaction() { @@ -854,6 +966,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public Object attach(@Nullable Object attachment) { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? ses0.attach(attachment) : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T attachment() { + SessionData ses0 = sesHolder.get(); + + return ses0 != null ? (T)ses0.attachment() : null; + } + + /** {@inheritDoc} */ @Override public <K1, V1> Map<K1, V1> properties() { SessionData ses0 = sesHolder.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 5f877ec..cb86e0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { * @return Public API proxy. */ public TransactionProxy proxy(); + + /** + * @param topVer New topology version. + */ + public void onRemap(AffinityTopologyVersion topVer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb8825e..9e8950f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>(); /** Topology version. */ - private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); + protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); /** Mutex. */ private final Lock lock = new ReentrantLock(); @@ -405,7 +405,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean storeUsed() { - return storeEnabled() && store() != null; + if (!storeEnabled()) + return false; + + Collection<Integer> cacheIds = activeCacheIds(); + + if (!cacheIds.isEmpty()) { + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + return true; + } + } + + return false; } /** @@ -413,13 +427,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * * @return Store manager. */ - protected CacheStoreManager store() { - if (!activeCacheIds().isEmpty()) { - int cacheId = F.first(activeCacheIds()); + protected Collection<CacheStoreManager> stores() { + Collection<Integer> cacheIds = activeCacheIds(); + + if (!cacheIds.isEmpty()) { + Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); - CacheStoreManager store = cctx.cacheContext(cacheId).store(); + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); - return store.configured() ? store : null; + if (store.configured()) + stores.add(store); + } + + return stores; } return null; @@ -493,13 +514,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + assert false : this; + } + + /** {@inheritDoc} */ @Override public boolean hasTransforms() { return transform; } /** {@inheritDoc} */ - @Override - public boolean markPreparing() { + @Override public boolean markPreparing() { return preparing.compareAndSet(false, true); } @@ -1716,6 +1741,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public void onRemap(AffinityTopologyVersion topVer) { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ @Override public boolean empty() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); }