GG-9655 - Fixing tests after merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be5b908c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be5b908c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be5b908c Branch: refs/heads/sprint-1 Commit: be5b908c0c1c358a63fffe1ec5f4e6586eec3368 Parents: f004f9d Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Feb 2 17:18:40 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Feb 2 17:18:40 2015 -0800 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 9 +- .../distributed/GridDistributedTxMapping.java | 6 +- .../distributed/dht/GridDhtLockFuture.java | 8 +- .../distributed/dht/GridDhtTxFinishFuture.java | 22 --- .../distributed/dht/GridDhtTxFinishRequest.java | 114 ++------------- .../cache/distributed/dht/GridDhtTxLocal.java | 39 +++++- .../distributed/dht/GridDhtTxPrepareFuture.java | 103 +++++++++----- .../dht/preloader/GridDhtPreloader.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 21 ++- .../near/GridNearTxPrepareFuture.java | 139 ++++--------------- .../cache/transactions/IgniteTxHandler.java | 73 ++++++---- .../IgniteCacheExpiryPolicyAbstractTest.java | 6 +- 12 files changed, 213 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 835aa39..a5d39ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -272,14 +272,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.join(exchWorker, log); - exchFuts = null; - ResendTimeoutObject resendTimeoutObj = pendingResend.getAndSet(null); if (resendTimeoutObj != null) cctx.time().removeTimeoutObject(resendTimeoutObj); } + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + super.stop0(cancel); + + exchFuts = null; + } + public GridDhtPartitionTopology<K, V> clientTopology(int cacheId, GridDhtPartitionExchangeId exchId) { GridClientPartitionTopology<K, V> top = clientTops.get(cacheId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index bda5db0..257a331 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -21,6 +21,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.tostring.*; @@ -28,7 +29,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; /** * Transaction node mapping. @@ -76,7 +76,7 @@ public class GridDistributedTxMapping<K, V> implements Externalizable { public GridDistributedTxMapping(ClusterNode node) { this.node = node; - entries = new ConcurrentLinkedQueue<>(); + entries = new GridConcurrentLinkedHashSet<>(); } /** @@ -271,7 +271,7 @@ public class GridDistributedTxMapping<K, V> implements Externalizable { */ private void ensureModifiable() { if (readOnly) { - entries = new ConcurrentLinkedQueue<>(entries); + entries = new GridConcurrentLinkedHashSet<>(entries); readOnly = false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index b5da263..efc3452 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -780,12 +780,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } } - if (tx != null) { - tx.addDhtNodeEntryMapping(dhtMap); - tx.addNearNodeEntryMapping(nearMap); - + if (tx != null) tx.needsCompletedVersions(hasRmtNodes); - } if (isDone()) { if (log.isDebugEnabled()) @@ -878,8 +874,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean invalidateRdr = e.readerId(n.id()) != null; - IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null; - req.addDhtKey( e.key(), e.getOrMarshalKeyBytes(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index f6054c7..922e644 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.future.*; @@ -330,20 +329,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); - if (!tx.pessimistic()) { - int idx = 0; - - for (IgniteTxEntry<K, V> e : dhtMapping.writes()) - req.ttl(idx++, e.ttl()); - - if (nearMapping != null) { - idx = 0; - - for (IgniteTxEntry<K, V> e : nearMapping.writes()) - req.nearTtl(idx++, e.ttl()); - } - } - try { cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); @@ -395,13 +380,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); - if (!tx.pessimistic()) { - int idx = 0; - - for (IgniteTxEntry<K, V> e : nearMapping.writes()) - req.nearTtl(idx++, e.ttl()); - } - if (tx.onePhaseCommit()) req.writeVersion(tx.writeVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 395edd9..9c39c7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -18,10 +18,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -72,12 +70,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest @GridDirectVersion(2) private int taskNameHash; - /** TTLs for optimistic transaction. */ - private GridLongList ttls; - - /** Near cache TTLs for optimistic transaction. */ - private GridLongList nearTtls; - /** * Empty constructor required for {@link Externalizable}. */ @@ -228,56 +220,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; } - /** - * @param idx Entry index. - * @param ttl TTL. - */ - public void ttl(int idx, long ttl) { - if (ttl != -1L) { - if (ttls == null) { - ttls = new GridLongList(); - - for (int i = 0; i < idx - 1; i++) - ttls.add(-1L); - } - } - - if (ttls != null) - ttls.add(ttl); - } - - /** - * @return TTLs for optimistic transaction. - */ - public GridLongList ttls() { - return ttls; - } - - /** - * @param idx Entry index. - * @param ttl TTL. - */ - public void nearTtl(int idx, long ttl) { - if (ttl != -1L) { - if (nearTtls == null) { - nearTtls = new GridLongList(); - - for (int i = 0; i < idx - 1; i++) - nearTtls.add(-1L); - } - } - - if (nearTtls != null) - nearTtls.add(ttl); - } - - /** - * @return TTLs for optimistic transaction. - */ - public GridLongList nearTtls() { - return nearTtls; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); @@ -308,8 +250,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest _clone.writeVer = writeVer; _clone.subjId = subjId; _clone.taskNameHash = taskNameHash; - _clone.ttls = ttls; - _clone.nearTtls = nearTtls; } /** {@inheritDoc} */ @@ -347,12 +287,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; case 22: - if (!commState.putLongList(nearTtls)) - return false; - - commState.idx++; - - case 23: if (pendingVers != null) { if (commState.it == null) { if (!commState.putInt(pendingVers.size())) @@ -379,42 +313,35 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: + case 23: if (!commState.putBoolean(sysInvalidate)) return false; commState.idx++; - case 25: + case 24: if (!commState.putLong(topVer)) return false; commState.idx++; - case 26: - if (!commState.putLongList(ttls)) - return false; - - commState.idx++; - - case 27: + case 25: if (!commState.putCacheVersion(writeVer)) return false; commState.idx++; - case 28: + case 26: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 29: + case 27: if (!commState.putInt(taskNameHash)) return false; commState.idx++; - } return true; @@ -460,16 +387,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; case 22: - GridLongList nearTtls0 = commState.getLongList(); - - if (nearTtls0 == LONG_LIST_NOT_READ) - return false; - - nearTtls = nearTtls0; - - commState.idx++; - - case 23: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -498,7 +415,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: + case 23: if (buf.remaining() < 1) return false; @@ -506,7 +423,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 25: + case 24: if (buf.remaining() < 8) return false; @@ -514,17 +431,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 26: - GridLongList ttls0 = commState.getLongList(); - - if (ttls0 == LONG_LIST_NOT_READ) - return false; - - ttls = ttls0; - - commState.idx++; - - case 27: + case 25: GridCacheVersion writeVer0 = commState.getCacheVersion(); if (writeVer0 == CACHE_VER_NOT_READ) @@ -534,7 +441,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 28: + case 26: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -544,14 +451,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 29: + case 27: if (buf.remaining() < 4) return false; taskNameHash = commState.getInt(); commState.idx++; - } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 7bd05da..4077275 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -280,7 +280,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements if (optimistic()) { assert isSystemInvalidate(); - return prepareAsync(null, null, Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 0, nearMiniId, null, true, + return prepareAsync( + null, + null, + Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), + 0, + nearMiniId, + null, + true, + null, null); } @@ -289,8 +297,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements if (fut == null) { // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), true, needReturnValue(), null))) + if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( + cctx, + this, + nearMiniId, + Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), + true, + needReturnValue(), + null, + null))) return prepFut.get(); } else @@ -348,14 +363,17 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync( + @Nullable Iterable<IgniteTxEntry<K, V>> reads, @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId, IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups) { + Collection<UUID> lastBackups, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture<K, V> fut = prepFut.get(); @@ -363,8 +381,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements init(); // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last, - needReturnValue(), lastBackups))) { + if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( + cctx, + this, + nearMiniId, + verMap, + last, + needReturnValue(), + lastBackups, + completeCb))) { GridDhtTxPrepareFuture<K, V> f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 903de41..406a589 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -124,6 +124,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** Locks ready flag. */ private volatile boolean locksReady; + /** */ + private IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb; + /** * Empty constructor required for {@link Externalizable}. */ @@ -146,7 +149,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, boolean retVal, - Collection<UUID> lastBackups + Collection<UUID> lastBackups, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb ) { super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { @Override public boolean collect(IgniteTxEx<K, V> e) { @@ -176,6 +180,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu this.retVal = retVal; + this.completeCb = completeCb; + assert dhtMap != null; assert nearMap != null; } @@ -273,13 +279,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheEntryEx<K, V> cached = txEntry.cached(); - try { - if (txEntry.op() == CREATE || txEntry.op() == UPDATE && txEntry.drExpireTime() == -1L) { - ExpiryPolicy expiry = txEntry.expiry(); + ExpiryPolicy expiry = txEntry.expiry(); - if (expiry == null) - expiry = cacheCtx.expiry(); + if (expiry == null) + expiry = cacheCtx.expiry(); + try { + if (txEntry.op() == CREATE || txEntry.op() == UPDATE && txEntry.drExpireTime() == -1L) { if (expiry != null) { Duration duration = cached.hasValue() ? expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); @@ -342,6 +348,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { + if (expiry != null) + txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); + txEntry.op(GridCacheOperation.NOOP); if (filterFailedKeys == null) @@ -385,10 +394,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu // U.error(log, "Failed to automatically rollback transaction: " + tx, ex); // } // - // If not local node. - if (!tx.nearNodeId().equals(cctx.localNodeId())) { + try { // Send reply back to near node. - GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>( + GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>( tx.nearXidVersion(), tx.nearFutureId(), nearMiniId, @@ -397,14 +405,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu ret, t); - try { - cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e); - - tx.rollbackAsync(); - } + sendPrepareResponse(res); + } + catch (IgniteCheckedException ignore) { + tx.rollbackAsync(); } onComplete(); @@ -521,20 +525,31 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu onComplete(); - if (!tx.colocated() && tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) { - IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); + if (!tx.near()) { + if (tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) { + IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); - fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() { - @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) { - try { - if (replied.compareAndSet(false, true)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() { + @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } } - } - }); + }); + } + } + else { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } } return true; @@ -579,6 +594,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException { if (!tx.nearNodeId().equals(cctx.localNodeId())) cctx.io().send(tx.nearNodeId(), res); + else { + assert completeCb != null; + + completeCb.apply(res); + } } /** @@ -778,16 +798,16 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu boolean hasRemoteNodes = false; // Assign keys to primary nodes. - if (!F.isEmpty(reads)) { - for (IgniteTxEntry<K, V> read : reads) - hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); - } - if (!F.isEmpty(writes)) { for (IgniteTxEntry<K, V> write : writes) hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); } + if (!F.isEmpty(reads)) { + for (IgniteTxEntry<K, V> read : reads) + hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); + } + tx.needsCompletedVersions(hasRemoteNodes); } @@ -990,12 +1010,23 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); - boolean ret; - GridCacheContext<K, V> cacheCtx = entry.context(); GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + ExpiryPolicy expiry = entry.expiry(); + + if (expiry == null) + expiry = cacheCtx.expiry(); + + if (expiry != null && entry.op() == READ) { + entry.op(NOOP); + + entry.ttl(CU.toTtl(expiry.getExpiryForAccess())); + } + + boolean ret; + while (true) { try { Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index d5e9714..6bb7c79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -119,7 +119,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { top = cctx.dht().topology(); - startFut = new GridFutureAdapter<>(cctx.kernalContext()); + startFut = new GridFutureAdapter<>(cctx.kernalContext(), false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index b8bda46..e77689f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -873,9 +873,13 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal(@Nullable Collection<IgniteTxEntry<K, V>> reads, - @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups) { + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal( + @Nullable Collection<IgniteTxEntry<K, V>> reads, + @Nullable Collection<IgniteTxEntry<K, V>> writes, + Map<UUID, Collection<UUID>> txNodes, boolean last, + Collection<UUID> lastBackups, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { if (state() != PREPARING) { if (timedOut()) return new GridFinishedFuture<>(cctx.kernalContext(), @@ -889,8 +893,15 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { init(); - GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>(cctx, this, IgniteUuid.randomUuid(), - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, needReturnValue() && implicit(), lastBackups); + GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>( + cctx, + this, + IgniteUuid.randomUuid(), + Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), + last, + needReturnValue() && implicit(), + lastBackups, + completeCb); try { // At this point all the entries passed in must be enlisted in transaction because this is an http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index ee22b3e..fb422a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -586,7 +587,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut futId, tx.topologyVersion(), tx, - tx.optimistic() && tx.serializable() ? m.reads() : null, + m.reads(), m.writes(), /*grp lock key*/null, /*part lock*/false, @@ -605,63 +606,20 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut req.addDhtVersion(txEntry.txKey(), null); } - if (node.isLocal()) { - IgniteInternalFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(node.id(), tx, req); - - // Add new future. - add(new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() { - @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) { - if (ex != null) { - onError(node.id(), null, ex); - - return t; - } - - IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t; - - Collection<Integer> invalidParts = dhtTx.invalidPartitions(); - - assert F.isEmpty(invalidParts); - - if (!m.empty()) { - for (IgniteTxEntry<K, V> writeEntry : m.entries()) { - IgniteTxKey<K> key = writeEntry.txKey(); - - IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key); - - assert dhtTxEntry != null; - - if (dhtTxEntry.op() == NOOP) { - IgniteTxEntry<K, V> txEntry = tx.entry(key); - - assert txEntry != null; - - txEntry.op(NOOP); - } - } - - tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); + final MiniFuture fut = new MiniFuture(m, null); - m.dhtVersion(dhtTx.xidVersion()); - } + req.miniId(fut.futureId()); - tx.implicitSingleResult(dhtTx.implicitSingleResult()); + add(fut); - return tx; - } + if (node.isLocal()) { + cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse<K, V>>() { + @Override public void apply(GridNearTxPrepareResponse<K, V> res) { + fut.onResult(node.id(), res); } - )); + }); } else { - MiniFuture fut = new MiniFuture(m, null); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - try { cctx.io().send(node, req); } @@ -750,76 +708,27 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } } + final MiniFuture fut = new MiniFuture(m, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + // If this is the primary node for the keys. if (n.isLocal()) { - req.miniId(IgniteUuid.randomUuid()); - // At this point, if any new node joined, then it is // waiting for this transaction to complete, so // partition reassignments are not possible here. - IgniteInternalFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); - - // Add new future. - add(new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() { - @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) { - if (ex != null) { - onError(n.id(), mappings, ex); - - return t; - } - - IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t; - - Collection<Integer> invalidParts = dhtTx.invalidPartitions(); - - assert F.isEmpty(invalidParts); - - tx.implicitSingleResult(dhtTx.implicitSingleResult()); - - if (!m.empty()) { - for (IgniteTxEntry<K, V> writeEntry : m.entries()) { - IgniteTxKey<K> key = writeEntry.txKey(); - - IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key); - - if (dhtTxEntry.op() == NOOP) - tx.entry(key).op(NOOP); - } - - tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); - - m.dhtVersion(dhtTx.xidVersion()); - - GridCacheVersion min = dhtTx.minVersion(); - - IgniteTxManager<K, V> tm = cctx.tm(); - - if (m.near()) - tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), - tm.committedVersions(min), tm.rolledbackVersions(min)); - } - - // Continue prepare before completing the future. - proceedPrepare(mappings); - - return tx; - } + cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse<K, V>>() { + @Override public void apply(GridNearTxPrepareResponse<K, V> res) { + fut.onResult(n.id(), res); } - )); + }); } else { assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx + ", nodeId=" + n.id() + ']'; - MiniFuture fut = new MiniFuture(m, mappings); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - try { cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } @@ -1054,6 +963,14 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut assert txEntry != null : "Missing tx entry for write key: " + key; txEntry.op(NOOP); + + ExpiryPolicy expiry = txEntry.expiry(); + + if (expiry == null) + expiry = txEntry.context().expiry(); + + if (expiry != null) + txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); } if (!m.empty()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 6a0d200..911516b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -54,7 +54,7 @@ public class IgniteTxHandler<K, V> { public IgniteInternalFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, final GridNearTxPrepareRequest<K, V> req) { - return prepareTx(nearNodeId, null, req); + return prepareTx(nearNodeId, null, req, null); } /** @@ -134,23 +134,29 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Future for transaction. */ - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { + public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx( + UUID nearNodeId, + @Nullable GridNearTxLocal<K, V> locTx, + GridNearTxPrepareRequest<K, V> req, + @Nullable IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { assert nearNodeId != null; assert req != null; if (locTx != null) { + assert completeCb != null; + if (req.near()) { // Make sure not to provide Near entries to DHT cache. req.cloneEntries(); - return prepareNearTx(nearNodeId, req); + return prepareNearTx(nearNodeId, req, completeCb); } else - return prepareColocatedTx(locTx, req); + return prepareColocatedTx(locTx, req, completeCb); } else - return prepareNearTx(nearNodeId, req); + return prepareNearTx(nearNodeId, req, null); } /** @@ -160,8 +166,11 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { + private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx( + final GridNearTxLocal<K, V> locTx, + final GridNearTxPrepareRequest<K, V> req, + final IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { IgniteInternalFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. @@ -173,8 +182,13 @@ public class IgniteTxHandler<K, V> { if (ex != null) throw new GridClosureException(ex); - IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(), - req.transactionNodes(), req.last(), req.lastBackups()); + IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal( + req.reads(), + req.writes(), + req.transactionNodes(), + req.last(), + req.lastBackups(), + completeCb); if (locTx.isRollbackOnly()) locTx.rollbackAsync(); @@ -206,8 +220,11 @@ public class IgniteTxHandler<K, V> { * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId, - final GridNearTxPrepareRequest<K, V> req) { + private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx( + final UUID nearNodeId, + final GridNearTxPrepareRequest<K, V> req, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { ClusterNode nearNode = ctx.node(nearNodeId); if (nearNode == null) { @@ -286,9 +303,16 @@ public class IgniteTxHandler<K, V> { if (req.returnValue()) tx.needReturnValue(true); - IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), - req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(), - req.lastBackups()); + IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync( + req.reads(), + req.writes(), + req.dhtVersions(), + req.messageId(), + req.miniId(), + req.transactionNodes(), + req.last(), + req.lastBackups(), + completeCb); if (tx.isRollbackOnly()) { try { @@ -722,10 +746,10 @@ public class IgniteTxHandler<K, V> { if (nearTx != null && nearTx.local()) nearTx = null; - finish(nodeId, dhtTx, req, req.ttls()); + finish(nodeId, dhtTx, req); if (nearTx != null) - finish(nodeId, nearTx, req, req.nearTtls()); + finish(nodeId, nearTx, req); if (dhtTx != null && !dhtTx.done()) { dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteTx>>() { @@ -742,13 +766,11 @@ public class IgniteTxHandler<K, V> { * @param nodeId Node ID. * @param tx Transaction. * @param req Request. - * @param ttls TTLs for optimistic transaction. */ protected void finish( UUID nodeId, IgniteTxRemoteEx<K, V> tx, - GridDhtTxFinishRequest<K, V> req, - @Nullable GridLongList ttls) { + GridDhtTxFinishRequest<K, V> req) { // We don't allow explicit locks for transactions and // therefore immediately return if transaction is null. // However, we may decide to relax this restriction in @@ -770,21 +792,12 @@ public class IgniteTxHandler<K, V> { log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req + ", tx=" + tx + ']'); - assert ttls == null || tx.concurrency() == OPTIMISTIC; - try { if (req.commit() || req.isSystemInvalidate()) { if (tx.commitVersion(req.commitVersion())) { tx.invalidate(req.isInvalidate()); tx.systemInvalidate(req.isSystemInvalidate()); - if (tx.concurrency() == OPTIMISTIC && ttls != null) { - int idx = 0; - - for (IgniteTxEntry<K, V> e : tx.writeEntries()) - e.ttl(ttls.get(idx)); - } - // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index c490156..4179310 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -885,7 +885,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs }, 3000); } - assertEquals("Unexpected ttl [grid=" + i + ", key=" + key +']', ttl, e.ttl()); + boolean primary = cache.entry(key).primary(); + boolean backup = cache.entry(key).backup(); + + assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e + + ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl()); if (ttl > 0) assertTrue(e.expireTime() > 0);