http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 8361423..5b011e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -63,9 +63,6 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements /** Near XID. */ private GridCacheVersion nearXidVer; - /** Transaction nodes mapping (primary node -> related backup nodes). */ - private Map<UUID, Collection<UUID>> txNodes; - /** Future. */ @GridToStringExclude private final AtomicReference<GridDhtTxPrepareFuture<K, V>> prepFut = @@ -153,11 +150,6 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements } /** {@inheritDoc} */ - @Override public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - - /** {@inheritDoc} */ @Override public UUID eventNodeId() { return nearNodeId; } @@ -201,6 +193,13 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements return nearFutId; } + /** + * @param nearFutId Near future ID. + */ + public void nearFutureId(IgniteUuid nearFutId) { + this.nearFutId = nearFutId; + } + /** {@inheritDoc} */ @Override protected IgniteUuid nearMiniId() { return nearMiniId; @@ -289,7 +288,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements if (fut == null) { // Future must be created before any exception can be thrown. if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), true, null))) + Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), true, needReturnValue(), null))) return prepFut.get(); } else @@ -355,8 +354,6 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) { - assert optimistic(); - // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture<K, V> fut = prepFut.get(); @@ -365,7 +362,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements // Future must be created before any exception can be thrown. if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last, - lastBackups))) { + needReturnValue(), lastBackups))) { GridDhtTxPrepareFuture<K, V> f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 564100b..b78561a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -238,10 +238,10 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K } if (!F.isEmpty(dhtEntryMap)) - addDhtMapping(dhtEntryMap); + addDhtNodeEntryMapping(dhtEntryMap); if (!F.isEmpty(nearEntryMap)) - addNearMapping(nearEntryMap); + addNearNodeEntryMapping(nearEntryMap); mapped.set(true); } @@ -284,18 +284,31 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K /** * @param mappings Mappings to add. */ - void addDhtMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { + void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { addMapping(mappings, dhtMap); } /** * @param mappings Mappings to add. */ - void addNearMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { + void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { addMapping(mappings, nearMap); } /** + * @param mappings Mappings to add. + */ + public void addDhtMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) { + addMapping0(mappings, dhtMap); + } + + /** + * @param mappings Mappings to add. + */ + public void addNearMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) { + addMapping0(mappings, nearMap); + } + /** * @param nodeId Node ID. * @return {@code True} if mapping was removed. */ @@ -353,21 +366,25 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K /** * @param mappings Entry mappings. - * @param map Transaction mappings. + * @param dst Transaction mappings. */ - private void addMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings, - Map<UUID, GridDistributedTxMapping<K, V>> map) { + private void addMapping( + Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings, + Map<UUID, GridDistributedTxMapping<K, V>> dst + ) { for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : mappings.entrySet()) { ClusterNode n = mapping.getKey(); - for (GridDhtCacheEntry<K, V> entry : mapping.getValue()) { + GridDistributedTxMapping<K, V> m = dst.get(n.id()); + + List<GridDhtCacheEntry<K, V>> entries = mapping.getValue(); + + for (GridDhtCacheEntry<K, V> entry : entries) { IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey()); if (txEntry != null) { - GridDistributedTxMapping<K, V> m = map.get(n.id()); - if (m == null) - map.put(n.id(), m = new GridDistributedTxMapping<>(n)); + dst.put(n.id(), m = new GridDistributedTxMapping<>(n)); m.add(txEntry); } @@ -375,13 +392,31 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K } } + /** + * @param mappings Mappings to add. + * @param dst Map to add to. + */ + private void addMapping0( + Map<UUID, GridDistributedTxMapping<K, V>> mappings, + Map<UUID, GridDistributedTxMapping<K, V>> dst + ) { + for (Map.Entry<UUID, GridDistributedTxMapping<K, V>> entry : mappings.entrySet()) { + GridDistributedTxMapping<K, V> targetMapping = dst.get(entry.getKey()); + + if (targetMapping == null) + dst.put(entry.getKey(), entry.getValue()); + else { + for (IgniteTxEntry<K, V> txEntry : entry.getValue().entries()) + targetMapping.add(txEntry); + } + } + } /** {@inheritDoc} */ @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int part) { assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']'; } - /** * @param msgId Message ID. * @param e Entry to add. @@ -393,17 +428,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K IgniteTxState state = state(); - assert state == ACTIVE || (state == PREPARING && optimistic()) : "Invalid tx state for " + + assert state == PREPARING : "Invalid tx state for " + "adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']'; e.unmarshal(cctx, false, cctx.deploy().globalLoader()); checkInternal(e.txKey()); - state = state(); - - assert state == ACTIVE || (state == PREPARING && optimistic()): "Invalid tx state for adding entry: " + e; - GridCacheContext<K, V> cacheCtx = e.context(); GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); @@ -419,6 +450,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K entry.ttl(e.ttl()); entry.filters(e.filters()); entry.drExpireTime(e.drExpireTime()); + entry.drVersion(e.drVersion()); } else { entry = e; @@ -471,23 +503,18 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K /** * @param cacheCtx Cache context. * @param entries Entries to lock. - * @param writeEntries Write entries for implicit transactions mapped to one node. * @param onePhaseCommit One phase commit flag. - * @param drVers DR versions. * @param msgId Message ID. - * @param implicit Implicit flag. * @param read Read flag. * @param accessTtl TTL for read operation. * @return Lock future. */ + @SuppressWarnings("ForLoopReplaceableByForEach") IgniteFuture<GridCacheReturn<V>> lockAllAsync( GridCacheContext<K, V> cacheCtx, - Collection<GridCacheEntryEx<K, V>> entries, - List<IgniteTxEntry<K, V>> writeEntries, + List<GridCacheEntryEx<K, V>> entries, boolean onePhaseCommit, - GridCacheVersion[] drVers, long msgId, - boolean implicit, final boolean read, long accessTtl ) { @@ -508,19 +535,16 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K onePhaseCommit(onePhaseCommit); try { - assert drVers == null || entries.size() == drVers.length; - Set<K> skipped = null; - int idx = 0; - int drVerIdx = 0; - long topVer = topologyVersion(); GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); // Enlist locks into transaction. - for (GridCacheEntryEx<K, V> entry : entries) { + for (int i = 0; i < entries.size(); i++) { + GridCacheEntryEx<K, V> entry = entries.get(i); + K key = entry.key(); IgniteTxEntry<K, V> txEntry = entry(entry.txKey()); @@ -533,9 +557,6 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K cached.unswap(!read, read); - IgniteTxEntry<K, V> - w = writeEntries == null ? null : writeEntries.get(idx++); - txEntry = addEntry(NOOP, null, null, @@ -546,22 +567,9 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K false, -1L, -1L, - drVers != null ? drVers[drVerIdx++] : null); - - if (w != null) { - assert key.equals(w.key()) : "Invalid entry [cached=" + cached + ", w=" + w + ']'; - - txEntry.op(w.op()); - txEntry.value(w.value(), w.hasWriteValue(), w.hasReadValue()); - txEntry.valueBytes(w.valueBytes()); - txEntry.drVersion(w.drVersion()); - txEntry.entryProcessors(w.entryProcessors()); - txEntry.ttl(w.ttl()); - txEntry.filters(w.filters()); - txEntry.drExpireTime(w.drExpireTime()); - txEntry.expiry(w.expiry()); - } - else if (read) + null); + + if (read) txEntry.ttl(accessTtl); txEntry.cached(cached, txEntry.keyBytes()); @@ -718,7 +726,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K } if (locNearMap != null) - addNearMapping(locNearMap); + addNearNodeEntryMapping(locNearMap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d7f8ef2..e2bb7e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; @@ -103,6 +104,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** IDs of backup nodes receiving last prepare request during this prepare. */ private Collection<UUID> lastBackups; + /** Needs return value flag. */ + private boolean retVal; + + /** Return value. */ + private GridCacheReturn<V> ret; + + /** Keys that did not pass the filter. */ + private Collection<IgniteTxKey<K>> filterFailedKeys; + /** * Empty constructor required for {@link Externalizable}. */ @@ -118,8 +128,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param last {@code True} if this is last prepare operation for node. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ - public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridDhtTxLocalAdapter<K, V> tx, - IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, Collection<UUID> lastBackups) { + public GridDhtTxPrepareFuture( + GridCacheSharedContext<K, V> cctx, + final GridDhtTxLocalAdapter<K, V> tx, + IgniteUuid nearMiniId, + Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, + boolean last, + boolean retVal, + Collection<UUID> lastBackups + ) { super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { @Override public boolean collect(IgniteTxEx<K, V> e) { return true; @@ -131,8 +148,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } }); - assert cctx != null; - this.cctx = cctx; this.tx = tx; this.dhtVerMap = dhtVerMap; @@ -148,6 +163,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu dhtMap = tx.dhtMap(); nearMap = tx.nearMap(); + this.retVal = retVal; + assert dhtMap != null; assert nearMap != null; } @@ -220,13 +237,24 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheEntryEx<K, V> cached = txEntry.cached(); try { - // Don't compare entry against itself. - if (!cached.lockedLocally(tx.xidVersion())) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + cached + - ", tx=" + tx + ']'); + if (txEntry.explicitVersion() == null) { + // Don't compare entry against itself. + if (!cached.lockedLocally(tx.xidVersion())) { + if (log.isDebugEnabled()) + log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + + cached + ", tx=" + tx + ']'); + + return false; + } + } + else { + if (!cached.lockedBy(txEntry.explicitVersion())) { + if (log.isDebugEnabled()) + log.debug("Transaction entry is not locked by explicit version (will wait) [entry=" + + cached + ", tx=" + tx + ']'); - return false; + return false; + } } break; // While. @@ -261,6 +289,67 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } /** + * + */ + private void checkFilters() { + ret = new GridCacheReturn<>(null, true); + + for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) { + GridCacheContext<K, V> cacheCtx = txEntry.context(); + + GridCacheEntryEx<K, V> cached = txEntry.cached(); + + try { + boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); + + if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) { + cached.unswap(true, retVal); + + V val = cached.innerGet( + tx, + /*swap*/true, + /*read through*/retVal || hasFilters, + /*fail fast*/false, + /*unmarshal*/true, + /*metrics*/retVal, + /*event*/retVal, + /*tmp*/false, + null, + null, + null, + null, + null); + + if (retVal) + ret.value(val); + + if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { + txEntry.op(GridCacheOperation.NOOP); + + if (filterFailedKeys == null) + filterFailedKeys = new ArrayList<>(); + + filterFailedKeys.add(cached.txKey()); + + ret.success(false); + } + else + ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue()); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get result value for cache entry: " + cached, e); + } + catch (GridCacheEntryRemovedException e) { + assert false : "Got entry removed exception while holding transactional lock on entry: " + e; + } + catch (GridCacheFilterFailedException e) { + assert false : "Got filter failed exception with fail fast false " + e; + } + } + } + + /** * @param t Error. */ public void onError(Throwable t) { @@ -281,8 +370,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu // If not local node. if (!tx.nearNodeId().equals(cctx.localNodeId())) { // Send reply back to near node. - GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(), - nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t); + GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>( + tx.nearXidVersion(), + tx.nearFutureId(), + nearMiniId, + tx.xidVersion(), + Collections.<Integer>emptySet(), + ret, + t); try { cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); @@ -379,54 +474,105 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu this.err.compareAndSet(null, err); - if (replied.compareAndSet(false, true)) { - try { - // Must clear prepare future before response is sent or listeners are notified. - if (tx.optimistic()) - tx.clearPrepareFuture(this); + // Must clear prepare future before response is sent or listeners are notified. + if (tx.optimistic()) + tx.clearPrepareFuture(this); - if (!tx.nearNodeId().equals(cctx.localNodeId())) { - // Send reply back to originating near node. - GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), - tx.nearFutureId(), nearMiniId, tx.xidVersion(), tx.invalidPartitions(), this.err.get()); + if (tx.onePhaseCommit()) { + assert last; - addDhtValues(res); + // Must create prepare response before transaction is committed to grab correct return value. + final GridNearTxPrepareResponse<K, V> res = createPrepareResponse(); - GridCacheVersion min = tx.minVersion(); + onComplete(); - res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + if (!tx.colocated() && tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) { + IgniteFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); - res.pending(localDhtPendingVersions(tx.writeEntries(), min)); + fut.listenAsync(new CIX1<IgniteFuture<IgniteTx>>() { + @Override public void applyx(IgniteFuture<IgniteTx> gridCacheTxGridFuture) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } + } + }); + } - cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + return true; + } + else { + if (replied.compareAndSet(false, true)) { + try { + sendPrepareResponse(createPrepareResponse()); + + return true; } + catch (IgniteCheckedException e) { + onError(e); - return true; + return true; + } + finally { + // Will call super.onDone(). + onComplete(); + } } - catch (IgniteCheckedException e) { - onError(e); + else { + // Other thread is completing future. Wait for it to complete. + try { + get(); + } + catch (IgniteInterruptedException e) { + onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); + } + catch (IgniteCheckedException ignored) { + // No-op, get() was just synchronization. + } - return true; - } - finally { - // Will call super.onDone(). - onComplete(); + return false; } } - else { - // Other thread is completing future. Wait for it to complete. - try { - get(); - } - catch (IgniteInterruptedException e) { - onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); - } - catch (IgniteCheckedException ignored) { - // No-op, get() was just synchronization. - } + } - return false; - } + /** + * @throws GridException If failed to send response. + */ + private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException { + if (!tx.nearNodeId().equals(cctx.localNodeId())) + cctx.io().send(tx.nearNodeId(), res); + } + + /** + * @return Prepare response. + */ + private GridNearTxPrepareResponse<K, V> createPrepareResponse() { + // Send reply back to originating near node. + GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>( + tx.nearXidVersion(), + tx.colocated() ? tx.xid() : tx.nearFutureId(), + nearMiniId == null ? tx.xid() : nearMiniId, + tx.xidVersion(), + tx.invalidPartitions(), + ret, + err.get()); + + addDhtValues(res); + + GridCacheVersion min = tx.minVersion(); + + res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + + res.pending(localDhtPendingVersions(tx.writeEntries(), min)); + + res.filterFailedKeys(filterFailedKeys); + + tx.implicitSingleResult(ret); + + return res; } /** @@ -585,164 +731,135 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu return; try { - Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>(); - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>(); - - boolean hasRemoteNodes = false; - - // Assign keys to primary nodes. - if (!F.isEmpty(reads)) { - for (IgniteTxEntry<K, V> read : reads) - hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); - } - - if (!F.isEmpty(writes)) { - for (IgniteTxEntry<K, V> write : writes) - hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); - } - - if (isDone()) - return; - - tx.needsCompletedVersions(hasRemoteNodes); - - // Create mini futures. - for (GridDistributedTxMapping<K, V> dhtMapping : futDhtMap.values()) { - assert !dhtMapping.empty(); - - ClusterNode n = dhtMapping.node(); - - assert !n.isLocal(); - - GridDistributedTxMapping<K, V> nearMapping = futNearMap.get(n.id()); - - MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), nearMap.get(n.id())); + // We are holding transaction-level locks for entries here, so we can get next write version. + tx.writeVersion(cctx.versions().next(tx.topologyVersion())); - add(fut); // Append new future. + checkFilters(); - Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == null ? null : nearMapping.writes(); + { + Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>(); + Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>(); - GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - dhtMapping.writes(), - nearWrites, - tx.groupLockKey(), - tx.partitionLock(), - txNodes, - tx.nearXidVersion(), - lastBackup(n.id()), - tx.subjectId(), - tx.taskNameHash()); + boolean hasRemoteNodes = false; - int idx = 0; + // Assign keys to primary nodes. + if (!F.isEmpty(reads)) { + for (IgniteTxEntry<K, V> read : reads) + hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); + } - for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) { - try { - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + if (!F.isEmpty(writes)) { + for (IgniteTxEntry<K, V> write : writes) + hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); + } - GridCacheMvccCandidate<K> added = cached.candidate(version()); + tx.needsCompletedVersions(hasRemoteNodes); + } - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + if (isDone()) + return; - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); + if (last) { + assert tx.transactionNodes() != null; - req.invalidateNearEntry(idx, cached.readerId(n.id()) != null); + // Create mini futures. + for (GridDistributedTxMapping<K, V> dhtMapping : tx.dhtMap().values()) { + assert !dhtMapping.empty(); - if (cached.isNewLocked()) - req.markKeyForPreload(idx); + ClusterNode n = dhtMapping.node(); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + assert !n.isLocal(); - idx++; - } + GridDistributedTxMapping<K, V> nearMapping = tx.nearMap().get(n.id()); - if (!F.isEmpty(nearWrites)) { - for (IgniteTxEntry<K, V> entry : nearWrites) { - try { - GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); - - assert added != null; - assert added.dhtLocal(); - - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - } - } + Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == null ? null : nearMapping.writes(); - //noinspection TryWithIdenticalCatches - try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (ClusterTopologyException e) { - fut.onResult(e); - } - catch (IgniteCheckedException e) { - fut.onResult(e); - } - } + Collection<IgniteTxEntry<K, V>> dhtWrites = dhtMapping.writes(); - for (GridDistributedTxMapping<K, V> nearMapping : futNearMap.values()) { - if (!futDhtMap.containsKey(nearMapping.node().id())) { - assert nearMapping.writes() != null; + if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) + continue; - MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); + MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), nearMap.get(n.id())); add(fut); // Append new future. + assert txNodes != null; + GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( futId, fut.futureId(), tx.topologyVersion(), tx, - null, - nearMapping.writes(), + dhtWrites, + nearWrites, tx.groupLockKey(), tx.partitionLock(), - null, + txNodes, tx.nearXidVersion(), - false, + true, + tx.onePhaseCommit(), + lastBackup(n.id()), tx.subjectId(), tx.taskNameHash()); - for (IgniteTxEntry<K, V> entry : nearMapping.writes()) { + int idx = 0; + + for (IgniteTxEntry<K, V> entry : dhtWrites) { try { - GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate<K> added = cached.candidate(version()); - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); + assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; + + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + } + + // Do not invalidate near entry on originating transaction node. + req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && + cached.readerId(n.id()) != null); + + if (cached.isNewLocked()) + req.markKeyForPreload(idx); break; } catch (GridCacheEntryRemovedException ignore) { assert false : "Got removed exception on entry with dht local candidate: " + entry; } + + idx++; + } + + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry<K, V> entry : nearWrites) { + try { + GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + + assert added != null; + assert added.dhtLocal(); + + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + } } + assert req.transactionNodes() != null; + //noinspection TryWithIdenticalCatches try { - cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException e) { fut.onResult(e); @@ -751,6 +868,64 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu fut.onResult(e); } } + + for (GridDistributedTxMapping<K, V> nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.node().id())) { + assert nearMapping.writes() != null; + + MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); + + add(fut); // Append new future. + + GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + null, + nearMapping.writes(), + tx.groupLockKey(), + tx.partitionLock(), + tx.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry<K, V> entry : nearMapping.writes()) { + try { + GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + + assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; + + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + } + + assert req.transactionNodes() != null; + + //noinspection TryWithIdenticalCatches + try { + cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (ClusterTopologyException e) { + fut.onResult(e); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + } } } finally { @@ -976,8 +1151,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu else { // Process evicted readers (no need to remap). if (nearMapping != null && !F.isEmpty(res.nearEvicted())) { - nearMapping.evictReaders(res.nearEvicted()); - for (IgniteTxEntry<K, V> entry : nearMapping.entries()) { if (res.nearEvicted().contains(entry.txKey())) { while (true) { @@ -999,6 +1172,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } } } + + nearMapping.evictReaders(res.nearEvicted()); } // Process invalid partitions (no need to remap). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 5430e53..31e563c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -109,6 +109,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque * @param txNodes Transaction nodes mapping. * @param nearXidVer Near transaction ID. * @param last {@code True} if this is last prepare request for node. + * @param onePhaseCommit One phase commit flag. */ public GridDhtTxPrepareRequest( IgniteUuid futId, @@ -122,9 +123,10 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque Map<UUID, Collection<UUID>> txNodes, GridCacheVersion nearXidVer, boolean last, + boolean onePhaseCommit, UUID subjId, int taskNameHash) { - super(tx, null, dhtWrites, grpLockKey, partLock, txNodes); + super(tx, null, dhtWrites, grpLockKey, partLock, txNodes, onePhaseCommit); assert futId != null; assert miniId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 2ed6262..e260b63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -50,9 +50,6 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> /** Near transaction ID. */ private GridCacheVersion nearXidVer; - /** Transaction nodes mapping (primary node -> related backup nodes). */ - private Map<UUID, Collection<UUID>> txNodes; - /** * Empty constructor required for {@link Externalizable}. */ @@ -206,11 +203,6 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> return nearXidVer; } - /** {@inheritDoc} */ - @Override public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - /** * @return Near node ID. */ @@ -289,7 +281,6 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param keyBytes Key bytes. * @param val Value. * @param valBytes Value bytes. - * @param drVer Data center replication version. * @param entryProcessors Entry processors. * @param ttl TTL. */ @@ -300,7 +291,6 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable V val, @Nullable byte[] valBytes, @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors, - @Nullable GridCacheVersion drVer, long ttl) { checkInternal(key); @@ -316,7 +306,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> ttl, -1L, cached, - drVer); + null); txEntry.keyBytes(keyBytes); txEntry.valueBytes(valBytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 8d0b009..a795f42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -439,30 +439,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t); } - /** - * @param cached Entry to check. - * @return {@code True} if filter passed. - */ - private boolean filter(GridCacheEntryEx<K, V> cached) { - try { - if (!cctx.isAll(cached, filter)) { - if (log.isDebugEnabled()) - log.debug("Filter didn't pass for entry (will fail lock): " + cached); - - onFailed(true); - - return false; - } - - return true; - } - catch (IgniteCheckedException e) { - onError(e); - - return false; - } - } - /** {@inheritDoc} */ @Override public boolean cancel() { if (onCancelled()) @@ -750,18 +726,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity distributedKeys.add(key); - if (inTx() && implicitTx() && mappings.size() == 1 && !cctx.writeThrough()) { - tx.onePhaseCommit(true); - - req.onePhaseCommit(true); - } - - IgniteTxEntry<K, V> writeEntry = tx != null ? tx.writeMap().get(txKey) : null; - - if (writeEntry != null) - // We are sending entry to remote node, clear transfer flag. - writeEntry.transferRequired(false); - if (tx != null) tx.addKeyMapping(txKey, mapping.node()); @@ -770,8 +734,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity node.isLocal() ? null : entry.getOrMarshalKeyBytes(), retval, dhtVer, // Include DHT version to match remote DHT entry. - writeEntry, - inTx() ? tx.entry(txKey).drVersion() : null, cctx); } @@ -1011,9 +973,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (tx != null) { for (K key : distributedKeys) tx.addKeyMapping(cctx.txKey(key), cctx.localNode()); - - if (tx.implicit() && !cctx.writeThrough()) - tx.onePhaseCommit(true); } lockLocally(distributedKeys, topVer, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 2409335..29f3622 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -874,8 +874,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B distributedKeys.add(key); - IgniteTxEntry<K, V> writeEntry = tx != null ? tx.writeMap().get(txKey) : null; - if (tx != null) tx.addKeyMapping(txKey, mapping.node()); @@ -884,13 +882,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B node.isLocal() ? null : entry.getOrMarshalKeyBytes(), retval && dhtVer == null, dhtVer, // Include DHT version to match remote DHT entry. - writeEntry, - inTx() ? tx.entry(txKey).drVersion() : null, cctx); - - // Clear transfer required flag since we are sending message. - if (writeEntry != null) - writeEntry.transferRequired(false); } if (cand.reentry()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 37b1b21..d9cb028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -276,8 +276,6 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> * @param retVal Flag indicating whether value should be returned. * @param keyBytes Key bytes. * @param dhtVer DHT version. - * @param writeEntry Write entry if implicit transaction mapped on one node. - * @param drVer DR version. * @param ctx Context. * @throws IgniteCheckedException If failed. */ @@ -286,14 +284,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> byte[] keyBytes, boolean retVal, @Nullable GridCacheVersion dhtVer, - @Nullable IgniteTxEntry<K, V> writeEntry, - @Nullable GridCacheVersion drVer, GridCacheContext<K, V> ctx ) throws IgniteCheckedException { dhtVers[idx] = dhtVer; // Delegate to super. - addKeyBytes(key, keyBytes, writeEntry, retVal, null, drVer, ctx); + addKeyBytes(key, keyBytes, retVal, (Collection<GridCacheMvccCandidate<K>>)null, ctx); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 9942e68..62421f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -263,7 +263,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> byte[] bytes = !keyBytes.isEmpty() ? keyBytes.get(i) : null; Collection<GridCacheMvccCandidate<K>> cands = req.candidatesByIndex(i); - GridCacheVersion drVer = req.drVersionByIndex(i); if (log.isDebugEnabled()) log.debug("Unmarshalled key: " + key); @@ -312,7 +311,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> } tx.addEntry(ctx, txKey, bytes, GridCacheOperation.NOOP, /*Value.*/null, - /*Value byts.*/null, drVer); + /*Value byts.*/null, /*dr version*/null); } // Add remote candidate before reordering. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index f63103e..0f7aed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -357,8 +357,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu null, null, tx.size(), - commit && tx.pessimistic() ? m.writes() : null, - commit && tx.pessimistic() ? F.view(tx.writeEntries(), CU.<K, V>transferRequired()) : null, tx.subjectId(), tx.taskNameHash() ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index b84e724..48f72e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -78,8 +78,6 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. * @param txSize Expected transaction size. - * @param writeEntries Write entries. - * @param recoverEntries Recover entries. */ public GridNearTxFinishRequest( IgniteUuid futId, @@ -97,12 +95,10 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, int txSize, - Collection<IgniteTxEntry<K, V>> writeEntries, - Collection<IgniteTxEntry<K, V>> recoverEntries, @Nullable UUID subjId, int taskNameHash) { super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers, - rolledbackVers, txSize, writeEntries, recoverEntries, null); + rolledbackVers, txSize, null); this.explicitLock = explicitLock; this.storeEnabled = storeEnabled; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f790bee..21e804d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -374,11 +374,6 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { return mappings; } - /** {@inheritDoc} */ - @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() { - return F.view(writeEntries(), CU.<K, V>transferRequired()); - } - /** * @param nodeId Node ID. * @param dhtVer DHT version. @@ -499,7 +494,6 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { * @param mapQueue Mappings queue. */ void removeKeysMapping(UUID failedNodeId, Iterable<GridDistributedTxMapping<K, V>> mapQueue) { - assert optimistic(); assert failedNodeId != null; assert mapQueue != null; @@ -572,9 +566,10 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { try { // Handle explicit locks. - GridCacheVersion base = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; + GridCacheVersion explicit = txEntry.explicitVersion(); - entry.readyNearLock(base, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); + if (explicit == null) + entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); break; } @@ -694,12 +689,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { - IgniteFuture<IgniteTxEx<K, V>> fut = prepFut.get(); + GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. - fut = pessimistic() ? new PessimisticPrepareFuture<>(cctx.kernalContext(), this) : - new GridNearTxPrepareFuture<>(cctx, this); + fut = new GridNearTxPrepareFuture<>(cctx, this); if (!prepFut.compareAndSet(null, fut)) return prepFut.get(); @@ -713,19 +707,17 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { // For pessimistic mode we don't distribute prepare request and do not lock topology version // as it was fixed on first lock. if (pessimistic()) { - PessimisticPrepareFuture<K, V> pessimisticFut = (PessimisticPrepareFuture<K, V>)fut; - if (!state(PREPARING)) { if (setRollbackOnly()) { if (timedOut()) - pessimisticFut.onError(new IgniteTxTimeoutException("Transaction timed out and was " + + fut.onError(new IgniteTxTimeoutException("Transaction timed out and was " + "rolled back: " + this)); else - pessimisticFut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + + fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } else - pessimisticFut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare " + + fut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare " + "[state=" + state() + ", tx=" + this + ']')); return fut; @@ -734,26 +726,18 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { try { userPrepare(); - if (!state(PREPARED)) { - setRollbackOnly(); - - pessimisticFut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + - state() + ", tx=" + this + ']')); + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(fut); - return fut; - } - - pessimisticFut.complete(); + fut.prepare(); } catch (IgniteCheckedException e) { - pessimisticFut.onError(e); + fut.onError(e); } } else { // In optimistic mode we must wait for topology map update. - GridNearTxPrepareFuture<K, V> pf = (GridNearTxPrepareFuture<K, V>)prepFut.get(); - - pf.prepare(); + fut.prepare(); } return fut; @@ -892,8 +876,6 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { public IgniteFuture<IgniteTxEx<K, V>> prepareAsyncLocal(@Nullable Collection<IgniteTxEntry<K, V>> reads, @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) { - assert optimistic(); - if (state() != PREPARING) { if (timedOut()) return new GridFinishedFuture<>(cctx.kernalContext(), @@ -908,7 +890,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { init(); GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>(cctx, this, IgniteUuid.randomUuid(), - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, lastBackups); + Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, needReturnValue() && implicit(), lastBackups); try { // At this point all the entries passed in must be enlisted in transaction because this is an http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 4b91ff4..e6dc3e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -255,6 +255,13 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } /** + * @param e Error. + */ + void onError(Throwable e) { + onError(null, null, e); + } + + /** * @param nodeId Sender. * @param res Result. */ @@ -323,65 +330,69 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * Waits for topology exchange future to be ready and then prepares user transaction. */ public void prepare() { - GridDhtTopologyFuture topFut = topologyReadLock(); + if (tx.optimistic()) { + GridDhtTopologyFuture topFut = topologyReadLock(); - try { - if (topFut.isDone()) { - try { - if (!tx.state(PREPARING)) { - if (tx.setRollbackOnly()) { - if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutException("Transaction timed out and " + - "was rolled back: " + this)); + try { + if (topFut.isDone()) { + try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, null, new IgniteTxTimeoutException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } else - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); + onError(null, null, new IgniteTxRollbackException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; } - else - onError(null, null, new IgniteTxRollbackException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); - return; - } + GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot(); - GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot(); + tx.topologyVersion(snapshot.topologyVersion()); + tx.topologySnapshot(snapshot); - tx.topologyVersion(snapshot.topologyVersion()); - tx.topologySnapshot(snapshot); + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); + prepare0(); + } + catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } + catch (IgniteCheckedException e) { + tx.setRollbackOnly(); - prepare0(); - } - catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) { - onError(cctx.localNodeId(), null, e); - } - catch (IgniteCheckedException e) { - tx.setRollbackOnly(); + String msg = "Failed to prepare transaction (will attempt rollback): " + this; - String msg = "Failed to prepare transaction (will attempt rollback): " + this; + U.error(log, msg, e); - U.error(log, msg, e); + tx.rollbackAsync(); - tx.rollbackAsync(); + onError(null, null, new IgniteTxRollbackException(msg, e)); + } + } + else { + topFut.syncNotify(false); - onError(null, null, new IgniteTxRollbackException(msg, e)); + topFut.listenAsync(new CI1<IgniteFuture<Long>>() { + @Override public void apply(IgniteFuture<Long> t) { + prepare(); + } + }); } } - else { - topFut.syncNotify(false); - - topFut.listenAsync(new CI1<IgniteFuture<Long>>() { - @Override public void apply(IgniteFuture<Long> t) { - prepare(); - } - }); + finally { + topologyReadUnlock(); } } - finally { - topologyReadUnlock(); - } + else + preparePessimistic(); } /** @@ -472,22 +483,24 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut assert topVer > 0; - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - txMapping = new GridDhtTxMapping<>(); ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>(); + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + // Assign keys to primary nodes. GridDistributedTxMapping<K, V> cur = null; @@ -538,10 +551,169 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut txMapping.initLast(mappings); + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + proceedPrepare(mappings); } /** + * + */ + private void preparePessimistic() { + Map<ClusterNode, GridDistributedTxMapping<K, V>> mappings = new HashMap<>(); + + long topVer = tx.topologyVersion(); + + txMapping = new GridDhtTxMapping<>(); + + for (IgniteTxEntry<K, V> txEntry : tx.allEntries()) { + GridCacheContext<K, V> cacheCtx = txEntry.context(); + + List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer); + + ClusterNode primary = F.first(nodes); + + GridDistributedTxMapping<K, V> nodeMapping = mappings.get(primary); + + if (nodeMapping == null) { + nodeMapping = new GridDistributedTxMapping<>(primary); + + mappings.put(primary, nodeMapping); + } + + txEntry.nodeId(primary.id()); + + nodeMapping.add(txEntry); + + txMapping.addMapping(nodes); + } + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + for (final GridDistributedTxMapping<K, V> m : mappings.values()) { + final ClusterNode node = m.node(); + + GridNearTxPrepareRequest<K, V> req = new GridNearTxPrepareRequest<>( + futId, + tx.topologyVersion(), + tx, + tx.optimistic() && tx.serializable() ? m.reads() : null, + m.writes(), + /*grp lock key*/null, + /*part lock*/false, + tx.syncCommit(), + tx.syncRollback(), + txMapping.transactionNodes(), + true, + txMapping.transactionNodes().get(node.id()), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry<K, V> txEntry : m.writes()) { + assert txEntry.cached().detached() : "Expected detached entry while preparing transaction " + + "[locNodeId=" + cctx.localNodeId() + + ", txEntry=" + txEntry + ']'; + + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + if (node.isLocal()) { + IgniteFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(node.id(), tx, req); + + // Add new future. + add(new GridEmbeddedFuture<>( + cctx.kernalContext(), + fut, + new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() { + @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) { + if (ex != null) { + onError(node.id(), null, ex); + + return t; + } + + IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t; + + Collection<Integer> invalidParts = dhtTx.invalidPartitions(); + + assert F.isEmpty(invalidParts); + + if (!m.empty()) { + for (IgniteTxEntry<K, V> writeEntry : m.entries()) { + IgniteTxKey<K> key = writeEntry.txKey(); + + IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key); + + if (dhtTxEntry.op() == NOOP) + tx.entry(key).op(NOOP); + } + + tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); + + m.dhtVersion(dhtTx.xidVersion()); + + GridCacheVersion min = dhtTx.minVersion(); + + IgniteTxManager<K, V> tm = cctx.near().dht().context().tm(); + + tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), + tm.committedVersions(min), tm.rolledbackVersions(min)); + } + + tx.implicitSingleResult(dhtTx.implicitSingleResult()); + + return tx; + } + } + )); + } + else { + MiniFuture fut = new MiniFuture(m, null); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + try { + cctx.io().send(node, req); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + fut.onResult(e); + } + } + } + + markInitialized(); + } + + /** + * Checks if mapped transaction can be committed on one phase. + * One-phase commit can be done if transaction maps to one primary node and not more than one backup. + */ + private void checkOnePhase() { + if (cctx.isStoreEnabled()) + return; + + Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); + + if (map.size() == 1) { + Collection<UUID> backups = F.firstEntry(map).getValue(); + + if (backups.size() <= 1) + tx.onePhaseCommit(true); + } + } + + /** * Continues prepare after previous mapping successfully finished. * * @param mappings Queue of mappings. @@ -571,6 +743,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut txMapping.transactionNodes(), m.last(), m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), tx.subjectId(), tx.taskNameHash()); @@ -618,7 +793,18 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut assert F.isEmpty(invalidParts); + tx.implicitSingleResult(dhtTx.implicitSingleResult()); + if (!m.empty()) { + for (IgniteTxEntry<K, V> writeEntry : m.entries()) { + IgniteTxKey<K> key = writeEntry.txKey(); + + IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key); + + if (dhtTxEntry.op() == NOOP) + tx.entry(key).op(NOOP); + } + tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); m.dhtVersion(dhtTx.xidVersion()); @@ -872,6 +1058,17 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } } + if (tx.implicitSingle()) + tx.implicitSingleResult(res.returnValue()); + + for (IgniteTxKey<K> key : res.filterFailedKeys()) { + IgniteTxEntry<K, V> txEntry = tx.entry(key); + + assert txEntry != null : "Missing tx entry for write key: " + key; + + txEntry.op(NOOP); + } + if (!m.empty()) { // Register DHT version. tx.addDhtVersion(m.node().id(), res.dhtVersion()); @@ -882,7 +1079,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } // Proceed prepare before finishing mini future. - proceedPrepare(mappings); + if (mappings != null) + proceedPrepare(mappings); // Finish this mini future. onDone(tx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 8aecfe0..045bcf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.direct.*; @@ -55,8 +56,15 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ /** IDs of backup nodes receiving last prepare request during this prepare. */ @GridDirectCollection(UUID.class) + @GridToStringInclude private Collection<UUID> lastBackups; + /** Need return value flag. */ + private boolean retVal; + + /** Implicit single flag. */ + private boolean implicitSingle; + /** Subject ID. */ @GridDirectVersion(1) private UUID subjId; @@ -99,10 +107,13 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups, + boolean onePhaseCommit, + boolean retVal, + boolean implicitSingle, @Nullable UUID subjId, int taskNameHash ) { - super(tx, reads, writes, grpLockKey, partLock, txNodes); + super(tx, reads, writes, grpLockKey, partLock, txNodes, onePhaseCommit); assert futId != null; @@ -111,6 +122,8 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ this.near = near; this.last = last; this.lastBackups = lastBackups; + this.retVal = retVal; + this.implicitSingle = implicitSingle; this.subjId = subjId; this.taskNameHash = taskNameHash; } @@ -172,6 +185,20 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ } /** + * @return Whether return value is requested. + */ + public boolean returnValue() { + return retVal; + } + + /** + * @return Implicit single flag. + */ + public boolean implicitSingle() { + return implicitSingle; + } + + /** * @return Topology version. */ @Override public long topologyVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 7024e70..8cd1f12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -69,6 +69,20 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes @GridDirectCollection(byte[].class) private Collection<byte[]> ownedValsBytes; + /** Cache return value. */ + @GridDirectTransient + private GridCacheReturn<V> retVal; + + /** Return value bytes. */ + private byte[] retValBytes; + + /** Filter failed keys. */ + @GridDirectTransient + private Collection<K> filterFailedKeys; + + /** Filter failed key bytes. */ + private byte[] filterFailedKeyBytes; + /** * Empty constructor required by {@link Externalizable}. */ @@ -84,8 +98,15 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes * @param invalidParts Invalid partitions. * @param err Error. */ - public GridNearTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, GridCacheVersion dhtVer, - Collection<Integer> invalidParts, Throwable err) { + public GridNearTxPrepareResponse( + GridCacheVersion xid, + IgniteUuid futId, + IgniteUuid miniId, + GridCacheVersion dhtVer, + Collection<Integer> invalidParts, + GridCacheReturn<V> retVal, + Throwable err + ) { super(xid, err); assert futId != null; @@ -96,6 +117,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes this.miniId = miniId; this.dhtVer = dhtVer; this.invalidParts = invalidParts; + this.retVal = retVal; } /** @@ -146,6 +168,9 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes * @param valBytes Value bytes. */ public void addOwnedValue(IgniteTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) { + if (val == null && valBytes == null) + return; + if (ownedVals == null) ownedVals = new HashMap<>(); @@ -161,6 +186,27 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes } /** + * @return Return value. + */ + public GridCacheReturn<V> returnValue() { + return retVal; + } + + /** + * @param filterFailedKeys Collection of keys that did not pass the filter. + */ + public void filterFailedKeys(Collection<K> filterFailedKeys) { + this.filterFailedKeys = filterFailedKeys; + } + + /** + * @return Collection of keys that did not pass the filter. + */ + public Collection<K> filterFailedKeys() { + return filterFailedKeys == null ? Collections.<K>emptyList() : filterFailedKeys; + } + + /** * @param key Key. * @return {@code True} if response has owned value for given key. */ @@ -203,6 +249,13 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes ownedValsBytes.add(ctx.marshaller().marshal(F.t(entry.getKey(), tup.get1(), valBytes, rawBytes))); } } + + + if (retValBytes == null && retVal != null) + retValBytes = ctx.marshaller().marshal(retVal); + + if (filterFailedKeyBytes == null && filterFailedKeys != null) + filterFailedKeyBytes = ctx.marshaller().marshal(filterFailedKeys); } /** {@inheritDoc} */ @@ -220,6 +273,12 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes ownedVals.put(tup.get1(), F.t(tup.get2(), val, tup.get4() ? null : tup.get3())); } } + + if (retVal == null && retValBytes != null) + retVal = ctx.marshaller().unmarshal(retValBytes, ldr); + + if (filterFailedKeys == null && filterFailedKeyBytes != null) + filterFailedKeys = ctx.marshaller().unmarshal(filterFailedKeyBytes, ldr); } /** {@inheritDoc} */