http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java new file mode 100644 index 0000000..8669598 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -0,0 +1,1074 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.dr.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxState.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteTxEx<K, V>> + implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Context. */ + private GridCacheSharedContext<K, V> cctx; + + /** Future ID. */ + private IgniteUuid futId; + + /** Transaction. */ + @GridToStringExclude + private GridDhtTxLocalAdapter<K, V> tx; + + /** Near mappings. */ + private Map<UUID, GridDistributedTxMapping<K, V>> nearMap; + + /** DHT mappings. */ + private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap; + + /** Logger. */ + private IgniteLogger log; + + /** Error. */ + private AtomicReference<Throwable> err = new AtomicReference<>(null); + + /** Replied flag. */ + private AtomicBoolean replied = new AtomicBoolean(false); + + /** All replies flag. */ + private AtomicBoolean mapped = new AtomicBoolean(false); + + /** Prepare reads. */ + private Iterable<IgniteTxEntry<K, V>> reads; + + /** Prepare writes. */ + private Iterable<IgniteTxEntry<K, V>> writes; + + /** Tx nodes. */ + private Map<UUID, Collection<UUID>> txNodes; + + /** Trackable flag. */ + private boolean trackable = true; + + /** Near mini future id. */ + private IgniteUuid nearMiniId; + + /** DHT versions map. */ + private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap; + + /** {@code True} if this is last prepare operation for node. */ + private boolean last; + + /** IDs of backup nodes receiving last prepare request during this prepare. */ + private Collection<UUID> lastBackups; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtTxPrepareFuture() { + // No-op. + } + + /** + * @param cctx Context. + * @param tx Transaction. + * @param nearMiniId Near mini future id. + * @param dhtVerMap DHT versions map. + * @param last {@code True} if this is last prepare operation for node. + * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. + */ + public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridDhtTxLocalAdapter<K, V> tx, + IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, Collection<UUID> lastBackups) { + super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { + @Override public boolean collect(IgniteTxEx<K, V> e) { + return true; + } + + @Override public IgniteTxEx<K, V> reduce() { + // Nothing to aggregate. + return tx; + } + }); + + assert cctx != null; + + this.cctx = cctx; + this.tx = tx; + this.dhtVerMap = dhtVerMap; + this.last = last; + this.lastBackups = lastBackups; + + futId = IgniteUuid.randomUuid(); + + this.nearMiniId = nearMiniId; + + log = U.logger(ctx, logRef, GridDhtTxPrepareFuture.class); + + dhtMap = tx.dhtMap(); + nearMap = tx.nearMap(); + + assert dhtMap != null; + assert nearMap != null; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** + * @return Near mini future id. + */ + public IgniteUuid nearMiniId() { + return nearMiniId; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return tx.xidVersion(); + } + + /** + * @return Involved nodes. + */ + @Override public Collection<? extends ClusterNode> nodes() { + return + F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + boolean ret = tx.hasWriteKey(entry.txKey()); + + return ret && mapIfLocked(); + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** + * @return Transaction. + */ + GridDhtTxLocalAdapter<K, V> tx() { + return tx; + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) { + while (true) { + GridCacheEntryEx<K, V> cached = txEntry.cached(); + + try { + // Don't compare entry against itself. + if (!cached.lockedLocally(tx.xidVersion())) { + if (log.isDebugEnabled()) + log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + cached + + ", tx=" + tx + ']'); + + return false; + } + + break; // While. + } + // Possible if entry cached within transaction is obsolete. + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); + + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + for (IgniteFuture<?> fut : futures()) + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyException("Remote node left grid (will retry): " + nodeId)); + + return true; + } + } + + return false; + } + + /** + * @param t Error. + */ + public void onError(Throwable t) { + if (err.compareAndSet(null, t)) { + tx.setRollbackOnly(); + + // TODO: GG-4005: + // TODO: as an improvement, at some point we must rollback right away. + // TODO: However, in this case need to make sure that reply is sent back + // TODO: even for non-existing transactions whenever finish request comes in. +// try { +// tx.rollback(); +// } +// catch (IgniteCheckedException ex) { +// U.error(log, "Failed to automatically rollback transaction: " + tx, ex); +// } +// + // If not local node. + if (!tx.nearNodeId().equals(cctx.localNodeId())) { + // Send reply back to near node. + GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(), + nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t); + + try { + cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e); + + tx.rollbackAsync(); + } + } + + onComplete(); + } + } + + /** + * @param nodeId Sender. + * @param res Result. + */ + public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { + if (!isDone()) { + for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(res); + + break; + } + } + } + } + } + + /** + * Marks all locks as ready for local transaction. + */ + private void readyLocks() { + // Ready all locks. + if (log.isDebugEnabled()) + log.debug("Marking all local candidates as ready: " + this); + + Iterable<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ? + Collections.singletonList(tx.groupLockEntry()) : writes; + + for (IgniteTxEntry<K, V> txEntry : checkEntries) { + if (txEntry.cached().isLocal()) + continue; + + while (true) { + GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); + + try { + GridCacheMvccCandidate<K> c = entry.readyLock(tx.xidVersion()); + + if (log.isDebugEnabled()) + log.debug("Current lock owner for entry [owner=" + c + ", entry=" + entry + ']'); + + break; // While. + } + // Possible if entry cached within transaction is obsolete. + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); + + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + } + + /** + * Checks if all ready locks are acquired and sends requests to remote nodes in this case. + * + * @return {@code True} if all locks are acquired, {@code false} otherwise. + */ + private boolean mapIfLocked() { + if (checkLocks()) { + prepare0(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteTxEx<K, V> tx0, Throwable err) { + assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " + + "pending mini futures: " + this; + + this.err.compareAndSet(null, err); + + if (replied.compareAndSet(false, true)) { + try { + // Must clear prepare future before response is sent or listeners are notified. + if (tx.optimistic()) + tx.clearPrepareFuture(this); + + if (!tx.nearNodeId().equals(cctx.localNodeId())) { + // Send reply back to originating near node. + GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), + tx.nearFutureId(), nearMiniId, tx.xidVersion(), tx.invalidPartitions(), this.err.get()); + + addDhtValues(res); + + GridCacheVersion min = tx.minVersion(); + + res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + + res.pending(localDhtPendingVersions(tx.writeEntries(), min)); + + cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + + return true; + } + catch (IgniteCheckedException e) { + onError(e); + + return true; + } + finally { + // Will call super.onDone(). + onComplete(); + } + } + else { + // Other thread is completing future. Wait for it to complete. + try { + get(); + } + catch (IgniteInterruptedException e) { + onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); + } + catch (IgniteCheckedException ignored) { + // No-op, get() was just synchronization. + } + + return false; + } + } + + /** + * @param res Response being sent. + */ + private void addDhtValues(GridNearTxPrepareResponse<K, V> res) { + // Interceptor on near node needs old values to execute callbacks. + if (!F.isEmpty(writes)) { + for (IgniteTxEntry<K, V> e : writes) { + IgniteTxEntry<K, V> txEntry = tx.entry(e.txKey()); + + assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']'; + + while (true) { + try { + GridCacheEntryEx<K, V> entry = txEntry.cached(); + + GridCacheVersion dhtVer = entry.version(); + + V val0 = null; + byte[] valBytes0 = null; + + GridCacheValueBytes valBytesTuple = entry.valueBytes(); + + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V) valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } + else + val0 = entry.rawGet(); + + if (val0 != null || valBytes0 != null) + res.addOwnedValue(txEntry.txKey(), dhtVer, val0, valBytes0); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + // Retry. + } + } + } + } + + for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : dhtVerMap.entrySet()) { + IgniteTxEntry<K, V> txEntry = tx.entry(ver.getKey()); + + if (res.hasOwnedValue(ver.getKey())) + continue; + + while (true) { + try { + GridCacheEntryEx<K, V> entry = txEntry.cached(); + + GridCacheVersion dhtVer = entry.version(); + + if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) { + V val0 = null; + byte[] valBytes0 = null; + + GridCacheValueBytes valBytesTuple = entry.valueBytes(); + + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V)valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } + else + val0 = entry.rawGet(); + + res.addOwnedValue(txEntry.txKey(), dhtVer, val0, valBytes0); + } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + // Retry. + } + } + } + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + * + * @return {@code True} if {@code done} flag was changed as a result of this call. + */ + private boolean onComplete() { + if (last || tx.isSystemInvalidate()) + tx.state(PREPARED); + + if (super.onDone(tx, err.get())) { + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** + * Completes this future. + */ + public void complete() { + onComplete(); + } + + /** + * Initializes future. + * + * @param reads Read entries. + * @param writes Write entries. + * @param txNodes Transaction nodes mapping. + */ + public void prepare(Iterable<IgniteTxEntry<K, V>> reads, Iterable<IgniteTxEntry<K, V>> writes, + Map<UUID, Collection<UUID>> txNodes) { + if (tx.empty()) { + tx.setRollbackOnly(); + + onDone(tx); + } + + this.reads = reads; + this.writes = writes; + this.txNodes = txNodes; + + readyLocks(); + + mapIfLocked(); + } + + /** + * @param backupId Backup node ID. + * @return {@code True} if backup node receives last prepare request for this transaction. + */ + private boolean lastBackup(UUID backupId) { + return lastBackups != null && lastBackups.contains(backupId); + } + + /** + * + */ + private void prepare0() { + if (!mapped.compareAndSet(false, true)) + return; + + try { + Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>(); + Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>(); + + boolean hasRemoteNodes = false; + + // Assign keys to primary nodes. + if (!F.isEmpty(reads)) { + for (IgniteTxEntry<K, V> read : reads) + hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); + } + + if (!F.isEmpty(writes)) { + for (IgniteTxEntry<K, V> write : writes) + hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); + } + + if (isDone()) + return; + + tx.needsCompletedVersions(hasRemoteNodes); + + // Create mini futures. + for (GridDistributedTxMapping<K, V> dhtMapping : futDhtMap.values()) { + assert !dhtMapping.empty(); + + ClusterNode n = dhtMapping.node(); + + assert !n.isLocal(); + + GridDistributedTxMapping<K, V> nearMapping = futNearMap.get(n.id()); + + MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), nearMap.get(n.id())); + + add(fut); // Append new future. + + Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == null ? null : nearMapping.writes(); + + GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + dhtMapping.writes(), + nearWrites, + tx.groupLockKey(), + tx.partitionLock(), + txNodes, + tx.nearXidVersion(), + lastBackup(n.id()), + tx.subjectId(), + tx.taskNameHash()); + + int idx = 0; + + for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) { + try { + GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + + GridCacheMvccCandidate<K> added = cached.candidate(version()); + + assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; + + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + req.invalidateNearEntry(idx, cached.readerId(n.id()) != null); + + if (cached.isNewLocked()) + req.markKeyForPreload(idx); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + + idx++; + } + + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry<K, V> entry : nearWrites) { + try { + GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + + assert added != null; + assert added.dhtLocal(); + + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + } + } + + //noinspection TryWithIdenticalCatches + try { + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (ClusterTopologyException e) { + fut.onResult(e); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + + for (GridDistributedTxMapping<K, V> nearMapping : futNearMap.values()) { + if (!futDhtMap.containsKey(nearMapping.node().id())) { + assert nearMapping.writes() != null; + + MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); + + add(fut); // Append new future. + + GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + null, + nearMapping.writes(), + tx.groupLockKey(), + tx.partitionLock(), + null, + tx.nearXidVersion(), + false, + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry<K, V> entry : nearMapping.writes()) { + try { + GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + + assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; + + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + } + + //noinspection TryWithIdenticalCatches + try { + cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (ClusterTopologyException e) { + fut.onResult(e); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + } + } + finally { + markInitialized(); + } + } + + /** + * @param entry Transaction entry. + * @param futDhtMap DHT mapping. + * @param futNearMap Near mapping. + * @return {@code True} if mapped. + */ + private boolean map( + IgniteTxEntry<K, V> entry, + Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap, + Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) { + if (entry.cached().isLocal()) + return false; + + GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + + boolean ret; + + GridCacheContext<K, V> cacheCtx = entry.context(); + + GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + + while (true) { + try { + Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); + + if (log.isDebugEnabled()) + log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + + ", entry=" + entry + ']'); + + Collection<UUID> readers = cached.readers(); + + Collection<ClusterNode> nearNodes = null; + + if (!F.isEmpty(readers)) { + nearNodes = cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId()))); + + if (log.isDebugEnabled()) + log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + + ", entry=" + entry + ']'); + } + else if (log.isDebugEnabled()) + log.debug("Entry has no near readers: " + entry); + + // Exclude local node. + ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); + + // Exclude DHT nodes. + ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + cached = dht.entryExx(entry.key()); + + entry.cached(cached, cached.keyBytes()); + } + } + + return ret; + } + + /** + * @param entry Entry. + * @param nodes Nodes. + * @param globalMap Map. + * @param locMap Exclude map. + * @return {@code True} if mapped. + */ + private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes, + Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, GridDistributedTxMapping<K, V>> locMap) { + boolean ret = false; + + if (nodes != null) { + for (ClusterNode n : nodes) { + GridDistributedTxMapping<K, V> global = globalMap.get(n.id()); + + if (global == null) + globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n)); + + global.add(entry); + + GridDistributedTxMapping<K, V> loc = locMap.get(n.id()); + + if (loc == null) + locMap.put(n.id(), loc = new GridDistributedTxMapping<>(n)); + + loc.add(entry); + + ret = true; + } + } + + return ret; + } + + /** + * Collects versions of pending candidates versions less than base. + * + * @param entries Tx entries to process. + * @param baseVer Base version. + * @return Collection of pending candidates versions. + */ + private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<IgniteTxEntry<K, V>> entries, + GridCacheVersion baseVer) { + Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5); + + for (IgniteTxEntry<K, V> entry : entries) { + try { + for (GridCacheMvccCandidate cand : entry.cached().localCandidates()) { + if (cand.version().isLess(baseVer)) + lessPending.add(cand.version()); + } + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, no candidates. + } + } + + return lessPending; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxPrepareFuture.class, this, "super", super.toString()); + } + + /** + * Mini-future for get operations. Mini-futures are only waiting on a single + * node as opposed to multiple nodes. + */ + private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Node ID. */ + private UUID nodeId; + + /** DHT mapping. */ + @GridToStringInclude + private GridDistributedTxMapping<K, V> dhtMapping; + + /** Near mapping. */ + @GridToStringInclude + private GridDistributedTxMapping<K, V> nearMapping; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public MiniFuture() { + super(cctx.kernalContext()); + } + + /** + * @param nodeId Node ID. + * @param dhtMapping Mapping. + * @param nearMapping nearMapping. + */ + MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) { + super(cctx.kernalContext()); + + assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node(); + + this.nodeId = nodeId; + this.dhtMapping = dhtMapping; + this.nearMapping = nearMapping; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return dhtMapping != null ? dhtMapping.node() : nearMapping.node(); + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyException e) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + + if (tx != null) + tx.removeMapping(nodeId); + + onDone(tx); + } + + /** + * @param res Result callback. + */ + void onResult(GridDhtTxPrepareResponse<K, V> res) { + if (res.error() != null) + // Fail the whole compound future. + onError(res.error()); + else { + // Process evicted readers (no need to remap). + if (nearMapping != null && !F.isEmpty(res.nearEvicted())) { + nearMapping.evictReaders(res.nearEvicted()); + + for (IgniteTxEntry<K, V> entry : nearMapping.entries()) { + if (res.nearEvicted().contains(entry.txKey())) { + while (true) { + try { + GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + + cached.removeReader(nearMapping.node().id(), res.messageId()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + GridCacheEntryEx<K, V> e = entry.context().cache().peekEx(entry.key()); + + if (e == null) + break; + + entry.cached(e, entry.keyBytes()); + } + } + } + } + } + + // Process invalid partitions (no need to remap). + if (!F.isEmpty(res.invalidPartitions())) { + for (Iterator<IgniteTxEntry<K, V>> it = dhtMapping.entries().iterator(); it.hasNext();) { + IgniteTxEntry<K, V> entry = it.next(); + + if (res.invalidPartitions().contains(entry.cached().partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + + ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); + } + } + + if (dhtMapping.empty()) { + dhtMap.remove(nodeId); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for node entirely because all partitions are invalid [nodeId=" + + nodeId + ", tx=" + tx + ']'); + } + } + + long topVer = tx.topologyVersion(); + + boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); + + for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId()); + + while (true) { + GridCacheEntryEx<K, V> entry = cacheCtx.cache().entryEx(info.key()); + + GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; + + try { + if (entry.initialValue(info.value(), info.valueBytes(), info.version(), + info.ttl(), info.expireTime(), true, topVer, drType)) { + if (rec && !entry.isInternal()) + cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_PRELOAD_OBJECT_LOADED, info.value(), true, null, + false, null, null, null); + } + + break; + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + onDone(e); + + return; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to set entry initial value (entry is obsolete, " + + "will retry): " + entry); + } + } + } + + // Finish mini future. + onDone(tx); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java new file mode 100644 index 0000000..b458b5f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * DHT prepare request. + */ +public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequest<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Max order. */ + private UUID nearNodeId; + + /** Future ID. */ + private IgniteUuid futId; + + /** Mini future ID. */ + private IgniteUuid miniId; + + /** Topology version. */ + private long topVer; + + /** Invalidate near entries flags. */ + private BitSet invalidateNearEntries; + + /** Near writes. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxEntry<K, V>> nearWrites; + + /** Serialized near writes. */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> nearWritesBytes; + + /** Owned versions by key. */ + @GridToStringInclude + @GridDirectTransient + private Map<IgniteTxKey<K>, GridCacheVersion> owned; + + /** Owned versions bytes. */ + private byte[] ownedBytes; + + /** Near transaction ID. */ + private GridCacheVersion nearXidVer; + + /** {@code True} if this is last prepare request for node. */ + private boolean last; + + /** Subject ID. */ + @GridDirectVersion(1) + private UUID subjId; + + /** Task name hash. */ + @GridDirectVersion(2) + private int taskNameHash; + + @GridDirectVersion(3) + /** Preload keys. */ + private BitSet preloadKeys; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtTxPrepareRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param miniId Mini future ID. + * @param topVer Topology version. + * @param tx Transaction. + * @param dhtWrites DHT writes. + * @param nearWrites Near writes. + * @param grpLockKey Group lock key if preparing group-lock transaction. + * @param partLock {@code True} if group-lock transaction locks partition. + * @param txNodes Transaction nodes mapping. + * @param nearXidVer Near transaction ID. + * @param last {@code True} if this is last prepare request for node. + */ + public GridDhtTxPrepareRequest( + IgniteUuid futId, + IgniteUuid miniId, + long topVer, + GridDhtTxLocalAdapter<K, V> tx, + Collection<IgniteTxEntry<K, V>> dhtWrites, + Collection<IgniteTxEntry<K, V>> nearWrites, + IgniteTxKey grpLockKey, + boolean partLock, + Map<UUID, Collection<UUID>> txNodes, + GridCacheVersion nearXidVer, + boolean last, + UUID subjId, + int taskNameHash) { + super(tx, null, dhtWrites, grpLockKey, partLock, txNodes); + + assert futId != null; + assert miniId != null; + + this.topVer = topVer; + this.futId = futId; + this.nearWrites = nearWrites; + this.miniId = miniId; + this.nearXidVer = nearXidVer; + this.last = last; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + + invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size()); + + nearNodeId = tx.nearNodeId(); + } + + /** + * @return {@code True} if this is last prepare request for node. + */ + public boolean last() { + return last; + } + + /** + * @return Near transaction ID. + */ + public GridCacheVersion nearXidVersion() { + return nearXidVer; + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @return Near node ID. + */ + public UUID nearNodeId() { + return nearNodeId; + } + + /** + * @return Subject ID. + */ + @Nullable public UUID subjectId() { + return subjId; + } + + /** + * @return Task name hash. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Near writes. + */ + public Collection<IgniteTxEntry<K, V>> nearWrites() { + return nearWrites == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : nearWrites; + } + + /** + * @param idx Entry index to set invalidation flag. + * @param invalidate Invalidation flag value. + */ + public void invalidateNearEntry(int idx, boolean invalidate) { + invalidateNearEntries.set(idx, invalidate); + } + + /** + * @param idx Index to get invalidation flag value. + * @return Invalidation flag value. + */ + public boolean invalidateNearEntry(int idx) { + return invalidateNearEntries.get(idx); + } + + /** + * Marks last added key for preloading. + */ + public void markKeyForPreload(int idx) { + if (preloadKeys == null) + preloadKeys = new BitSet(); + + preloadKeys.set(idx, true); + } + + /** + * Checks whether entry info should be sent to primary node from backup. + * + * @param idx Index. + * @return {@code True} if value should be sent, {@code false} otherwise. + */ + public boolean needPreloadKey(int idx) { + return preloadKeys != null && preloadKeys.get(idx); + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return Topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** + * Sets owner and its mapped version. + * + * @param key Key. + * @param ownerMapped Owner mapped version. + */ + public void owned(IgniteTxKey<K> key, GridCacheVersion ownerMapped) { + if (owned == null) + owned = new GridLeanMap<>(3); + + owned.put(key, ownerMapped); + } + + /** + * @return Owned versions map. + */ + public Map<IgniteTxKey<K>, GridCacheVersion> owned() { + return owned; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (ownedBytes == null && owned != null) { + ownedBytes = CU.marshal(ctx, owned); + + if (ctx.deploymentEnabled()) { + for (IgniteTxKey<K> k : owned.keySet()) + prepareObject(k, ctx); + } + } + + if (nearWrites != null) { + marshalTx(nearWrites, ctx); + + nearWritesBytes = new ArrayList<>(nearWrites.size()); + + for (IgniteTxEntry<K, V> e : nearWrites) + nearWritesBytes.add(ctx.marshaller().marshal(e)); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (ownedBytes != null && owned == null) + owned = ctx.marshaller().unmarshal(ownedBytes, ldr); + + if (nearWritesBytes != null) { + nearWrites = new ArrayList<>(nearWritesBytes.size()); + + for (byte[] arr : nearWritesBytes) + nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); + + unmarshalTx(nearWrites, true, ctx, ldr); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxPrepareRequest.class, this, "super", super.toString()); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtTxPrepareRequest _clone = new GridDhtTxPrepareRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtTxPrepareRequest _clone = (GridDhtTxPrepareRequest)_msg; + + _clone.nearNodeId = nearNodeId; + _clone.futId = futId; + _clone.miniId = miniId; + _clone.topVer = topVer; + _clone.invalidateNearEntries = invalidateNearEntries; + _clone.nearWrites = nearWrites; + _clone.nearWritesBytes = nearWritesBytes; + _clone.owned = owned; + _clone.ownedBytes = ownedBytes; + _clone.nearXidVer = nearXidVer; + _clone.last = last; + _clone.subjId = subjId; + _clone.taskNameHash = taskNameHash; + _clone.preloadKeys = preloadKeys; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 22: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 23: + if (!commState.putBitSet(invalidateNearEntries)) + return false; + + commState.idx++; + + case 24: + if (!commState.putBoolean(last)) + return false; + + commState.idx++; + + case 25: + if (!commState.putGridUuid(miniId)) + return false; + + commState.idx++; + + case 26: + if (!commState.putUuid(nearNodeId)) + return false; + + commState.idx++; + + case 27: + if (nearWritesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(nearWritesBytes.size())) + return false; + + commState.it = nearWritesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 28: + if (!commState.putCacheVersion(nearXidVer)) + return false; + + commState.idx++; + + case 29: + if (!commState.putByteArray(ownedBytes)) + return false; + + commState.idx++; + + case 30: + if (!commState.putLong(topVer)) + return false; + + commState.idx++; + + case 31: + if (!commState.putUuid(subjId)) + return false; + + commState.idx++; + + case 32: + if (!commState.putInt(taskNameHash)) + return false; + + commState.idx++; + + case 33: + if (!commState.putBitSet(preloadKeys)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 22: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 23: + BitSet invalidateNearEntries0 = commState.getBitSet(); + + if (invalidateNearEntries0 == BIT_SET_NOT_READ) + return false; + + invalidateNearEntries = invalidateNearEntries0; + + commState.idx++; + + case 24: + if (buf.remaining() < 1) + return false; + + last = commState.getBoolean(); + + commState.idx++; + + case 25: + IgniteUuid miniId0 = commState.getGridUuid(); + + if (miniId0 == GRID_UUID_NOT_READ) + return false; + + miniId = miniId0; + + commState.idx++; + + case 26: + UUID nearNodeId0 = commState.getUuid(); + + if (nearNodeId0 == UUID_NOT_READ) + return false; + + nearNodeId = nearNodeId0; + + commState.idx++; + + case 27: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearWritesBytes == null) + nearWritesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + nearWritesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 28: + GridCacheVersion nearXidVer0 = commState.getCacheVersion(); + + if (nearXidVer0 == CACHE_VER_NOT_READ) + return false; + + nearXidVer = nearXidVer0; + + commState.idx++; + + case 29: + byte[] ownedBytes0 = commState.getByteArray(); + + if (ownedBytes0 == BYTE_ARR_NOT_READ) + return false; + + ownedBytes = ownedBytes0; + + commState.idx++; + + case 30: + if (buf.remaining() < 8) + return false; + + topVer = commState.getLong(); + + commState.idx++; + + case 31: + UUID subjId0 = commState.getUuid(); + + if (subjId0 == UUID_NOT_READ) + return false; + + subjId = subjId0; + + commState.idx++; + + case 32: + if (buf.remaining() < 4) + return false; + + taskNameHash = commState.getInt(); + + commState.idx++; + + case 33: + BitSet preloadKeys0 = commState.getBitSet(); + + if (preloadKeys0 == BIT_SET_NOT_READ) + return false; + + preloadKeys = preloadKeys0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 33; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java new file mode 100644 index 0000000..0c2bfd1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * DHT transaction prepare response. + */ +public class GridDhtTxPrepareResponse<K, V> extends GridDistributedTxPrepareResponse<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Evicted readers. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxKey<K>> nearEvicted; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> nearEvictedBytes; + + /** Future ID. */ + private IgniteUuid futId; + + /** Mini future ID. */ + private IgniteUuid miniId; + + /** Invalid partitions. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Collection<Integer> invalidParts; + + @GridDirectTransient + /** Preload entries. */ + private List<GridCacheEntryInfo<K, V>> preloadEntries; + + /** */ + @GridDirectCollection(byte[].class) + @GridDirectVersion(1) + private List<byte[]> preloadEntriesBytes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDhtTxPrepareResponse() { + // No-op. + } + + /** + * @param xid Xid version. + * @param futId Future ID. + * @param miniId Mini future ID. + */ + public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) { + super(xid); + + assert futId != null; + assert miniId != null; + + this.futId = futId; + this.miniId = miniId; + } + + /** + * @param xid Xid version. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param err Error. + */ + public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err) { + super(xid, err); + + assert futId != null; + assert miniId != null; + + this.futId = futId; + this.miniId = miniId; + } + + /** + * @return Evicted readers. + */ + public Collection<IgniteTxKey<K>> nearEvicted() { + return nearEvicted; + } + + /** + * @param nearEvicted Evicted readers. + */ + public void nearEvicted(Collection<IgniteTxKey<K>> nearEvicted) { + this.nearEvicted = nearEvicted; + } + + /** + * @param nearEvictedBytes Near evicted bytes. + */ + public void nearEvictedBytes(Collection<byte[]> nearEvictedBytes) { + this.nearEvictedBytes = nearEvictedBytes; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return Invalid partitions. + */ + public Collection<Integer> invalidPartitions() { + return invalidParts; + } + + /** + * @param invalidParts Invalid partitions. + */ + public void invalidPartitions(Collection<Integer> invalidParts) { + this.invalidParts = invalidParts; + } + + /** + * Gets preload entries found on backup node. + * + * @return Collection of entry infos need to be preloaded. + */ + public Collection<GridCacheEntryInfo<K, V>> preloadEntries() { + return preloadEntries == null ? Collections.<GridCacheEntryInfo<K, V>>emptyList() : preloadEntries; + } + + /** + * Adds preload entry. + * + * @param info Info to add. + */ + public void addPreloadEntry(GridCacheEntryInfo<K, V> info) { + if (preloadEntries == null) + preloadEntries = new ArrayList<>(); + + preloadEntries.add(info); + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (nearEvictedBytes == null) + nearEvictedBytes = marshalCollection(nearEvicted, ctx); + + if (preloadEntriesBytes == null && preloadEntries != null) + preloadEntriesBytes = marshalCollection(preloadEntries, ctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + // Unmarshal even if deployment is disabled, since we could get bytes initially. + if (nearEvicted == null && nearEvictedBytes != null) + nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr); + + if (preloadEntries == null && preloadEntriesBytes != null) + preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString()); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtTxPrepareResponse _clone = new GridDhtTxPrepareResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtTxPrepareResponse _clone = (GridDhtTxPrepareResponse)_msg; + + _clone.nearEvicted = nearEvicted; + _clone.nearEvictedBytes = nearEvictedBytes; + _clone.futId = futId; + _clone.miniId = miniId; + _clone.invalidParts = invalidParts; + _clone.preloadEntries = preloadEntries; + _clone.preloadEntriesBytes = preloadEntriesBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 10: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 11: + if (invalidParts != null) { + if (commState.it == null) { + if (!commState.putInt(invalidParts.size())) + return false; + + commState.it = invalidParts.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putInt((int)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 12: + if (!commState.putGridUuid(miniId)) + return false; + + commState.idx++; + + case 13: + if (nearEvictedBytes != null) { + if (commState.it == null) { + if (!commState.putInt(nearEvictedBytes.size())) + return false; + + commState.it = nearEvictedBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 14: + if (preloadEntriesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(preloadEntriesBytes.size())) + return false; + + commState.it = preloadEntriesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 10: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 11: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (invalidParts == null) + invalidParts = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (buf.remaining() < 4) + return false; + + int _val = commState.getInt(); + + invalidParts.add((Integer)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 12: + IgniteUuid miniId0 = commState.getGridUuid(); + + if (miniId0 == GRID_UUID_NOT_READ) + return false; + + miniId = miniId0; + + commState.idx++; + + case 13: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearEvictedBytes == null) + nearEvictedBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + nearEvictedBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 14: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (preloadEntriesBytes == null) + preloadEntriesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + preloadEntriesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 34; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java new file mode 100644 index 0000000..524493d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; + +import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; + +/** + * Transaction created by system implicitly on remote nodes. + */ +public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Near node ID. */ + private UUID nearNodeId; + + /** Remote future ID. */ + private IgniteUuid rmtFutId; + + /** Near transaction ID. */ + private GridCacheVersion nearXidVer; + + /** Transaction nodes mapping (primary node -> related backup nodes). */ + private Map<UUID, Collection<UUID>> txNodes; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtTxRemote() { + // No-op. + } + + /** + * This constructor is meant for optimistic transactions. + * + * @param nearNodeId Near node ID. + * @param rmtFutId Remote future ID. + * @param nodeId Node ID. + * @param rmtThreadId Remote thread ID. + * @param topVer Topology version. + * @param xidVer XID version. + * @param commitVer Commit version. + * @param sys System flag. + * @param concurrency Concurrency level (should be pessimistic). + * @param isolation Transaction isolation. + * @param invalidate Invalidate flag. + * @param timeout Timeout. + * @param ctx Cache context. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param nearXidVer Near transaction ID. + * @param txNodes Transaction nodes mapping. + */ + public GridDhtTxRemote( + GridCacheSharedContext<K, V> ctx, + UUID nearNodeId, + IgniteUuid rmtFutId, + UUID nodeId, + long rmtThreadId, + long topVer, + GridCacheVersion xidVer, + GridCacheVersion commitVer, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + boolean invalidate, + long timeout, + int txSize, + @Nullable IgniteTxKey grpLockKey, + GridCacheVersion nearXidVer, + Map<UUID, Collection<UUID>> txNodes, + @Nullable UUID subjId, + int taskNameHash + ) { + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, + grpLockKey, subjId, taskNameHash); + + assert nearNodeId != null; + assert rmtFutId != null; + + this.nearNodeId = nearNodeId; + this.rmtFutId = rmtFutId; + this.nearXidVer = nearXidVer; + this.txNodes = txNodes; + + readMap = Collections.emptyMap(); + + writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + + topologyVersion(topVer); + } + + /** + * This constructor is meant for pessimistic transactions. + * + * @param nearNodeId Near node ID. + * @param rmtFutId Remote future ID. + * @param nodeId Node ID. + * @param nearXidVer Near transaction ID. + * @param rmtThreadId Remote thread ID. + * @param topVer Topology version. + * @param xidVer XID version. + * @param commitVer Commit version. + * @param sys System flag. + * @param concurrency Concurrency level (should be pessimistic). + * @param isolation Transaction isolation. + * @param invalidate Invalidate flag. + * @param timeout Timeout. + * @param ctx Cache context. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if transaction is group-lock. + */ + public GridDhtTxRemote( + GridCacheSharedContext<K, V> ctx, + UUID nearNodeId, + IgniteUuid rmtFutId, + UUID nodeId, + GridCacheVersion nearXidVer, + long rmtThreadId, + long topVer, + GridCacheVersion xidVer, + GridCacheVersion commitVer, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + boolean invalidate, + long timeout, + int txSize, + @Nullable IgniteTxKey grpLockKey, + @Nullable UUID subjId, + int taskNameHash + ) { + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, + grpLockKey, subjId, taskNameHash); + + assert nearNodeId != null; + assert rmtFutId != null; + + this.nearXidVer = nearXidVer; + this.nearNodeId = nearNodeId; + this.rmtFutId = rmtFutId; + + readMap = Collections.emptyMap(); + writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + + topologyVersion(topVer); + } + + /** {@inheritDoc} */ + @Override public boolean dht() { + return true; + } + + /** {@inheritDoc} */ + @Override public UUID eventNodeId() { + return nearNodeId(); + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> masterNodeIds() { + return Arrays.asList(nearNodeId, nodeId); + } + + /** {@inheritDoc} */ + @Override public UUID otherNodeId() { + return nearNodeId; + } + + /** {@inheritDoc} */ + @Override public boolean enforceSerializable() { + return false; // Serializable will be enforced on primary mode. + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion nearXidVersion() { + return nearXidVer; + } + + /** {@inheritDoc} */ + @Override public Map<UUID, Collection<UUID>> transactionNodes() { + return txNodes; + } + + /** + * @return Near node ID. + */ + UUID nearNodeId() { + return nearNodeId; + } + + /** + * @return Remote future ID. + */ + IgniteUuid remoteFutureId() { + return rmtFutId; + } + + /** {@inheritDoc} */ + @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId)) + return false; + + if (cacheCtx.config().getBackups() == 0) + return true; + + // Check if we are on the backup node. + return !cacheCtx.affinity().backups(key, topVer).contains(cctx.localNode()); + } + + /** {@inheritDoc} */ + @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) { + super.addInvalidPartition(cacheCtx, part); + + for (Iterator<IgniteTxEntry<K, V>> it = writeMap.values().iterator(); it.hasNext();) { + IgniteTxEntry<K, V> e = it.next(); + + GridCacheEntryEx<K, V> cached = e.cached(); + + if (cached != null) { + if (cached.partition() == part) + it.remove(); + } + else if (cacheCtx.affinity().partition(e.key()) == part) + it.remove(); + } + } + + /** + * @param entry Write entry. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + public void addWrite(IgniteTxEntry<K, V> entry, ClassLoader ldr) throws IgniteCheckedException { + entry.unmarshal(cctx, false, ldr); + + GridCacheContext<K, V> cacheCtx = entry.context(); + + try { + GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion()); + + checkInternal(entry.txKey()); + + // Initialize cache entry. + entry.cached(cached, entry.keyBytes()); + + writeMap.put(entry.txKey(), entry); + + addExplicit(entry); + } + catch (GridDhtInvalidPartitionException e) { + addInvalidPartition(cacheCtx, e.partition()); + } + } + + /** + * @param cacheCtx Cache context. + * @param op Write operation. + * @param key Key to add to write set. + * @param keyBytes Key bytes. + * @param val Value. + * @param valBytes Value bytes. + * @param drVer Data center replication version. + * @param entryProcessors Entry processors. + * @param ttl TTL. + */ + public void addWrite(GridCacheContext<K, V> cacheCtx, + GridCacheOperation op, + IgniteTxKey<K> key, + byte[] keyBytes, + @Nullable V val, + @Nullable byte[] valBytes, + @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors, + @Nullable GridCacheVersion drVer, + long ttl) { + checkInternal(key); + + if (isSystemInvalidate()) + return; + + GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion()); + + IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx, + this, + op, + val, + ttl, + -1L, + cached, + drVer); + + txEntry.keyBytes(keyBytes); + txEntry.valueBytes(valBytes); + txEntry.entryProcessors(entryProcessors); + + writeMap.put(key, txEntry); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString()); + } +}