http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java deleted file mode 100644 index 9e43c68..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ /dev/null @@ -1,1074 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.dr.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * - */ -public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteTxEx<K, V>> - implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Future ID. */ - private IgniteUuid futId; - - /** Transaction. */ - @GridToStringExclude - private GridDhtTxLocalAdapter<K, V> tx; - - /** Near mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> nearMap; - - /** DHT mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap; - - /** Logger. */ - private IgniteLogger log; - - /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); - - /** Replied flag. */ - private AtomicBoolean replied = new AtomicBoolean(false); - - /** All replies flag. */ - private AtomicBoolean mapped = new AtomicBoolean(false); - - /** Prepare reads. */ - private Iterable<IgniteTxEntry<K, V>> reads; - - /** Prepare writes. */ - private Iterable<IgniteTxEntry<K, V>> writes; - - /** Tx nodes. */ - private Map<UUID, Collection<UUID>> txNodes; - - /** Trackable flag. */ - private boolean trackable = true; - - /** Near mini future id. */ - private IgniteUuid nearMiniId; - - /** DHT versions map. */ - private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap; - - /** {@code True} if this is last prepare operation for node. */ - private boolean last; - - /** IDs of backup nodes receiving last prepare request during this prepare. */ - private Collection<UUID> lastBackups; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtTxPrepareFuture() { - // No-op. - } - - /** - * @param cctx Context. - * @param tx Transaction. - * @param nearMiniId Near mini future id. - * @param dhtVerMap DHT versions map. - * @param last {@code True} if this is last prepare operation for node. - * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. - */ - public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridDhtTxLocalAdapter<K, V> tx, - IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, Collection<UUID> lastBackups) { - super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { - @Override public boolean collect(IgniteTxEx<K, V> e) { - return true; - } - - @Override public IgniteTxEx<K, V> reduce() { - // Nothing to aggregate. - return tx; - } - }); - - assert cctx != null; - - this.cctx = cctx; - this.tx = tx; - this.dhtVerMap = dhtVerMap; - this.last = last; - this.lastBackups = lastBackups; - - futId = IgniteUuid.randomUuid(); - - this.nearMiniId = nearMiniId; - - log = U.logger(ctx, logRef, GridDhtTxPrepareFuture.class); - - dhtMap = tx.dhtMap(); - nearMap = tx.nearMap(); - - assert dhtMap != null; - assert nearMap != null; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** - * @return Near mini future id. - */ - public IgniteUuid nearMiniId() { - return nearMiniId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** - * @return Involved nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ - @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { - if (log.isDebugEnabled()) - log.debug("Transaction future received owner changed callback: " + entry); - - boolean ret = tx.hasWriteKey(entry.txKey()); - - return ret && mapIfLocked(); - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** - * @return Transaction. - */ - GridDhtTxLocalAdapter<K, V> tx() { - return tx; - } - - /** - * @return {@code True} if all locks are owned. - */ - private boolean checkLocks() { - for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) { - while (true) { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - try { - // Don't compare entry against itself. - if (!cached.lockedLocally(tx.xidVersion())) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + cached + - ", tx=" + tx + ']'); - - return false; - } - - break; // While. - } - // Possible if entry cached within transaction is obsolete. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyException("Remote node left grid (will retry): " + nodeId)); - - return true; - } - } - - return false; - } - - /** - * @param t Error. - */ - public void onError(Throwable t) { - if (err.compareAndSet(null, t)) { - tx.setRollbackOnly(); - - // TODO: GG-4005: - // TODO: as an improvement, at some point we must rollback right away. - // TODO: However, in this case need to make sure that reply is sent back - // TODO: even for non-existing transactions whenever finish request comes in. -// try { -// tx.rollback(); -// } -// catch (IgniteCheckedException ex) { -// U.error(log, "Failed to automatically rollback transaction: " + tx, ex); -// } -// - // If not local node. - if (!tx.nearNodeId().equals(cctx.localNodeId())) { - // Send reply back to near node. - GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), tx.nearFutureId(), - nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t); - - try { - cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e); - - tx.rollbackAsync(); - } - } - - onComplete(); - } - } - - /** - * @param nodeId Sender. - * @param res Result. - */ - public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { - if (!isDone()) { - for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); - - f.onResult(res); - - break; - } - } - } - } - } - - /** - * Marks all locks as ready for local transaction. - */ - private void readyLocks() { - // Ready all locks. - if (log.isDebugEnabled()) - log.debug("Marking all local candidates as ready: " + this); - - Iterable<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ? - Collections.singletonList(tx.groupLockEntry()) : writes; - - for (IgniteTxEntry<K, V> txEntry : checkEntries) { - if (txEntry.cached().isLocal()) - continue; - - while (true) { - GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); - - try { - GridCacheMvccCandidate<K> c = entry.readyLock(tx.xidVersion()); - - if (log.isDebugEnabled()) - log.debug("Current lock owner for entry [owner=" + c + ", entry=" + entry + ']'); - - break; // While. - } - // Possible if entry cached within transaction is obsolete. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - } - - /** - * Checks if all ready locks are acquired and sends requests to remote nodes in this case. - * - * @return {@code True} if all locks are acquired, {@code false} otherwise. - */ - private boolean mapIfLocked() { - if (checkLocks()) { - prepare0(); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(IgniteTxEx<K, V> tx0, Throwable err) { - assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " + - "pending mini futures: " + this; - - this.err.compareAndSet(null, err); - - if (replied.compareAndSet(false, true)) { - try { - // Must clear prepare future before response is sent or listeners are notified. - if (tx.optimistic()) - tx.clearPrepareFuture(this); - - if (!tx.nearNodeId().equals(cctx.localNodeId())) { - // Send reply back to originating near node. - GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), - tx.nearFutureId(), nearMiniId, tx.xidVersion(), tx.invalidPartitions(), this.err.get()); - - addDhtValues(res); - - GridCacheVersion min = tx.minVersion(); - - res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); - - res.pending(localDhtPendingVersions(tx.writeEntries(), min)); - - cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - - return true; - } - catch (IgniteCheckedException e) { - onError(e); - - return true; - } - finally { - // Will call super.onDone(). - onComplete(); - } - } - else { - // Other thread is completing future. Wait for it to complete. - try { - get(); - } - catch (IgniteInterruptedException e) { - onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); - } - catch (IgniteCheckedException ignored) { - // No-op, get() was just synchronization. - } - - return false; - } - } - - /** - * @param res Response being sent. - */ - private void addDhtValues(GridNearTxPrepareResponse<K, V> res) { - // Interceptor on near node needs old values to execute callbacks. - if (!F.isEmpty(writes)) { - for (IgniteTxEntry<K, V> e : writes) { - IgniteTxEntry<K, V> txEntry = tx.entry(e.txKey()); - - assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']'; - - while (true) { - try { - GridCacheEntryEx<K, V> entry = txEntry.cached(); - - GridCacheVersion dhtVer = entry.version(); - - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = entry.valueBytes(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V) valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = entry.rawGet(); - - if (val0 != null || valBytes0 != null) - res.addOwnedValue(txEntry.txKey(), dhtVer, val0, valBytes0); - - break; - } - catch (GridCacheEntryRemovedException ignored) { - // Retry. - } - } - } - } - - for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : dhtVerMap.entrySet()) { - IgniteTxEntry<K, V> txEntry = tx.entry(ver.getKey()); - - if (res.hasOwnedValue(ver.getKey())) - continue; - - while (true) { - try { - GridCacheEntryEx<K, V> entry = txEntry.cached(); - - GridCacheVersion dhtVer = entry.version(); - - if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) { - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = entry.valueBytes(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = entry.rawGet(); - - res.addOwnedValue(txEntry.txKey(), dhtVer, val0, valBytes0); - } - - break; - } - catch (GridCacheEntryRemovedException ignored) { - // Retry. - } - } - } - } - - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - /** - * Completeness callback. - * - * @return {@code True} if {@code done} flag was changed as a result of this call. - */ - private boolean onComplete() { - if (last || tx.isSystemInvalidate()) - tx.state(PREPARED); - - if (super.onDone(tx, err.get())) { - // Don't forget to clean up. - cctx.mvcc().removeFuture(this); - - return true; - } - - return false; - } - - /** - * Completes this future. - */ - public void complete() { - onComplete(); - } - - /** - * Initializes future. - * - * @param reads Read entries. - * @param writes Write entries. - * @param txNodes Transaction nodes mapping. - */ - public void prepare(Iterable<IgniteTxEntry<K, V>> reads, Iterable<IgniteTxEntry<K, V>> writes, - Map<UUID, Collection<UUID>> txNodes) { - if (tx.empty()) { - tx.setRollbackOnly(); - - onDone(tx); - } - - this.reads = reads; - this.writes = writes; - this.txNodes = txNodes; - - readyLocks(); - - mapIfLocked(); - } - - /** - * @param backupId Backup node ID. - * @return {@code True} if backup node receives last prepare request for this transaction. - */ - private boolean lastBackup(UUID backupId) { - return lastBackups != null && lastBackups.contains(backupId); - } - - /** - * - */ - private void prepare0() { - if (!mapped.compareAndSet(false, true)) - return; - - try { - Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>(); - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>(); - - boolean hasRemoteNodes = false; - - // Assign keys to primary nodes. - if (!F.isEmpty(reads)) { - for (IgniteTxEntry<K, V> read : reads) - hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); - } - - if (!F.isEmpty(writes)) { - for (IgniteTxEntry<K, V> write : writes) - hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); - } - - if (isDone()) - return; - - tx.needsCompletedVersions(hasRemoteNodes); - - // Create mini futures. - for (GridDistributedTxMapping<K, V> dhtMapping : futDhtMap.values()) { - assert !dhtMapping.empty(); - - ClusterNode n = dhtMapping.node(); - - assert !n.isLocal(); - - GridDistributedTxMapping<K, V> nearMapping = futNearMap.get(n.id()); - - MiniFuture fut = new MiniFuture(n.id(), dhtMap.get(n.id()), nearMap.get(n.id())); - - add(fut); // Append new future. - - Collection<IgniteTxEntry<K, V>> nearWrites = nearMapping == null ? null : nearMapping.writes(); - - GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - dhtMapping.writes(), - nearWrites, - tx.groupLockKey(), - tx.partitionLock(), - txNodes, - tx.nearXidVersion(), - lastBackup(n.id()), - tx.subjectId(), - tx.taskNameHash()); - - int idx = 0; - - for (IgniteTxEntry<K, V> entry : dhtMapping.writes()) { - try { - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); - - GridCacheMvccCandidate<K> added = cached.candidate(version()); - - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; - - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - - req.invalidateNearEntry(idx, cached.readerId(n.id()) != null); - - if (cached.isNewLocked()) - req.markKeyForPreload(idx); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - - idx++; - } - - if (!F.isEmpty(nearWrites)) { - for (IgniteTxEntry<K, V> entry : nearWrites) { - try { - GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); - - assert added != null; - assert added.dhtLocal(); - - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - } - } - - //noinspection TryWithIdenticalCatches - try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (ClusterTopologyException e) { - fut.onResult(e); - } - catch (IgniteCheckedException e) { - fut.onResult(e); - } - } - - for (GridDistributedTxMapping<K, V> nearMapping : futNearMap.values()) { - if (!futDhtMap.containsKey(nearMapping.node().id())) { - assert nearMapping.writes() != null; - - MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); - - add(fut); // Append new future. - - GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - null, - nearMapping.writes(), - tx.groupLockKey(), - tx.partitionLock(), - null, - tx.nearXidVersion(), - false, - tx.subjectId(), - tx.taskNameHash()); - - for (IgniteTxEntry<K, V> entry : nearMapping.writes()) { - try { - GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); - - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; - - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - } - - //noinspection TryWithIdenticalCatches - try { - cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (ClusterTopologyException e) { - fut.onResult(e); - } - catch (IgniteCheckedException e) { - fut.onResult(e); - } - } - } - } - finally { - markInitialized(); - } - } - - /** - * @param entry Transaction entry. - * @param futDhtMap DHT mapping. - * @param futNearMap Near mapping. - * @return {@code True} if mapped. - */ - private boolean map( - IgniteTxEntry<K, V> entry, - Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap, - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) { - if (entry.cached().isLocal()) - return false; - - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); - - boolean ret; - - GridCacheContext<K, V> cacheCtx = entry.context(); - - GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); - - while (true) { - try { - Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); - - if (log.isDebugEnabled()) - log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + - ", entry=" + entry + ']'); - - Collection<UUID> readers = cached.readers(); - - Collection<ClusterNode> nearNodes = null; - - if (!F.isEmpty(readers)) { - nearNodes = cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId()))); - - if (log.isDebugEnabled()) - log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + - ", entry=" + entry + ']'); - } - else if (log.isDebugEnabled()) - log.debug("Entry has no near readers: " + entry); - - // Exclude local node. - ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); - - // Exclude DHT nodes. - ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - cached = dht.entryExx(entry.key()); - - entry.cached(cached, cached.keyBytes()); - } - } - - return ret; - } - - /** - * @param entry Entry. - * @param nodes Nodes. - * @param globalMap Map. - * @param locMap Exclude map. - * @return {@code True} if mapped. - */ - private boolean map(IgniteTxEntry<K, V> entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, GridDistributedTxMapping<K, V>> locMap) { - boolean ret = false; - - if (nodes != null) { - for (ClusterNode n : nodes) { - GridDistributedTxMapping<K, V> global = globalMap.get(n.id()); - - if (global == null) - globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n)); - - global.add(entry); - - GridDistributedTxMapping<K, V> loc = locMap.get(n.id()); - - if (loc == null) - locMap.put(n.id(), loc = new GridDistributedTxMapping<>(n)); - - loc.add(entry); - - ret = true; - } - } - - return ret; - } - - /** - * Collects versions of pending candidates versions less than base. - * - * @param entries Tx entries to process. - * @param baseVer Base version. - * @return Collection of pending candidates versions. - */ - private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<IgniteTxEntry<K, V>> entries, - GridCacheVersion baseVer) { - Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5); - - for (IgniteTxEntry<K, V> entry : entries) { - try { - for (GridCacheMvccCandidate cand : entry.cached().localCandidates()) { - if (cand.version().isLess(baseVer)) - lessPending.add(cand.version()); - } - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, no candidates. - } - } - - return lessPending; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxPrepareFuture.class, this, "super", super.toString()); - } - - /** - * Mini-future for get operations. Mini-futures are only waiting on a single - * node as opposed to multiple nodes. - */ - private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Node ID. */ - private UUID nodeId; - - /** DHT mapping. */ - @GridToStringInclude - private GridDistributedTxMapping<K, V> dhtMapping; - - /** Near mapping. */ - @GridToStringInclude - private GridDistributedTxMapping<K, V> nearMapping; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public MiniFuture() { - super(cctx.kernalContext()); - } - - /** - * @param nodeId Node ID. - * @param dhtMapping Mapping. - * @param nearMapping nearMapping. - */ - MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) { - super(cctx.kernalContext()); - - assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node(); - - this.nodeId = nodeId; - this.dhtMapping = dhtMapping; - this.nearMapping = nearMapping; - } - - /** - * @return Future ID. - */ - IgniteUuid futureId() { - return futId; - } - - /** - * @return Node ID. - */ - public ClusterNode node() { - return dhtMapping != null ? dhtMapping.node() : nearMapping.node(); - } - - /** - * @param e Error. - */ - void onResult(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); - } - - /** - * @param e Node failure. - */ - void onResult(ClusterTopologyException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); - - if (tx != null) - tx.removeMapping(nodeId); - - onDone(tx); - } - - /** - * @param res Result callback. - */ - void onResult(GridDhtTxPrepareResponse<K, V> res) { - if (res.error() != null) - // Fail the whole compound future. - onError(res.error()); - else { - // Process evicted readers (no need to remap). - if (nearMapping != null && !F.isEmpty(res.nearEvicted())) { - nearMapping.evictReaders(res.nearEvicted()); - - for (IgniteTxEntry<K, V> entry : nearMapping.entries()) { - if (res.nearEvicted().contains(entry.txKey())) { - while (true) { - try { - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); - - cached.removeReader(nearMapping.node().id(), res.messageId()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - GridCacheEntryEx<K, V> e = entry.context().cache().peekEx(entry.key()); - - if (e == null) - break; - - entry.cached(e, entry.keyBytes()); - } - } - } - } - } - - // Process invalid partitions (no need to remap). - if (!F.isEmpty(res.invalidPartitions())) { - for (Iterator<IgniteTxEntry<K, V>> it = dhtMapping.entries().iterator(); it.hasNext();) { - IgniteTxEntry<K, V> entry = it.next(); - - if (res.invalidPartitions().contains(entry.cached().partition())) { - it.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + - ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); - } - } - - if (dhtMapping.empty()) { - dhtMap.remove(nodeId); - - if (log.isDebugEnabled()) - log.debug("Removed mapping for node entirely because all partitions are invalid [nodeId=" + - nodeId + ", tx=" + tx + ']'); - } - } - - long topVer = tx.topologyVersion(); - - boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); - - for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId()); - - while (true) { - GridCacheEntryEx<K, V> entry = cacheCtx.cache().entryEx(info.key()); - - GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; - - try { - if (entry.initialValue(info.value(), info.valueBytes(), info.version(), - info.ttl(), info.expireTime(), true, topVer, drType)) { - if (rec && !entry.isInternal()) - cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_PRELOAD_OBJECT_LOADED, info.value(), true, null, - false, null, null, null); - } - - break; - } - catch (IgniteCheckedException e) { - // Fail the whole thing. - onDone(e); - - return; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to set entry initial value (entry is obsolete, " + - "will retry): " + entry); - } - } - } - - // Finish mini future. - onDone(tx); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java deleted file mode 100644 index 127d07e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ /dev/null @@ -1,613 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * DHT prepare request. - */ -public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequest<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Max order. */ - private UUID nearNodeId; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Topology version. */ - private long topVer; - - /** Invalidate near entries flags. */ - private BitSet invalidateNearEntries; - - /** Near writes. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> nearWrites; - - /** Serialized near writes. */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> nearWritesBytes; - - /** Owned versions by key. */ - @GridToStringInclude - @GridDirectTransient - private Map<IgniteTxKey<K>, GridCacheVersion> owned; - - /** Owned versions bytes. */ - private byte[] ownedBytes; - - /** Near transaction ID. */ - private GridCacheVersion nearXidVer; - - /** {@code True} if this is last prepare request for node. */ - private boolean last; - - /** Subject ID. */ - @GridDirectVersion(1) - private UUID subjId; - - /** Task name hash. */ - @GridDirectVersion(2) - private int taskNameHash; - - @GridDirectVersion(3) - /** Preload keys. */ - private BitSet preloadKeys; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtTxPrepareRequest() { - // No-op. - } - - /** - * @param futId Future ID. - * @param miniId Mini future ID. - * @param topVer Topology version. - * @param tx Transaction. - * @param dhtWrites DHT writes. - * @param nearWrites Near writes. - * @param grpLockKey Group lock key if preparing group-lock transaction. - * @param partLock {@code True} if group-lock transaction locks partition. - * @param txNodes Transaction nodes mapping. - * @param nearXidVer Near transaction ID. - * @param last {@code True} if this is last prepare request for node. - */ - public GridDhtTxPrepareRequest( - IgniteUuid futId, - IgniteUuid miniId, - long topVer, - GridDhtTxLocalAdapter<K, V> tx, - Collection<IgniteTxEntry<K, V>> dhtWrites, - Collection<IgniteTxEntry<K, V>> nearWrites, - IgniteTxKey grpLockKey, - boolean partLock, - Map<UUID, Collection<UUID>> txNodes, - GridCacheVersion nearXidVer, - boolean last, - UUID subjId, - int taskNameHash) { - super(tx, null, dhtWrites, grpLockKey, partLock, txNodes); - - assert futId != null; - assert miniId != null; - - this.topVer = topVer; - this.futId = futId; - this.nearWrites = nearWrites; - this.miniId = miniId; - this.nearXidVer = nearXidVer; - this.last = last; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - - invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size()); - - nearNodeId = tx.nearNodeId(); - } - - /** - * @return {@code True} if this is last prepare request for node. - */ - public boolean last() { - return last; - } - - /** - * @return Near transaction ID. - */ - public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - - /** - * @return Near node ID. - */ - public UUID nearNodeId() { - return nearNodeId; - } - - /** - * @return Subject ID. - */ - @Nullable public UUID subjectId() { - return subjId; - } - - /** - * @return Task name hash. - */ - public int taskNameHash() { - return taskNameHash; - } - - /** - * @return Near writes. - */ - public Collection<IgniteTxEntry<K, V>> nearWrites() { - return nearWrites == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : nearWrites; - } - - /** - * @param idx Entry index to set invalidation flag. - * @param invalidate Invalidation flag value. - */ - public void invalidateNearEntry(int idx, boolean invalidate) { - invalidateNearEntries.set(idx, invalidate); - } - - /** - * @param idx Index to get invalidation flag value. - * @return Invalidation flag value. - */ - public boolean invalidateNearEntry(int idx) { - return invalidateNearEntries.get(idx); - } - - /** - * Marks last added key for preloading. - */ - public void markKeyForPreload(int idx) { - if (preloadKeys == null) - preloadKeys = new BitSet(); - - preloadKeys.set(idx, true); - } - - /** - * Checks whether entry info should be sent to primary node from backup. - * - * @param idx Index. - * @return {@code True} if value should be sent, {@code false} otherwise. - */ - public boolean needPreloadKey(int idx) { - return preloadKeys != null && preloadKeys.get(idx); - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return Topology version. - */ - @Override public long topologyVersion() { - return topVer; - } - - /** - * Sets owner and its mapped version. - * - * @param key Key. - * @param ownerMapped Owner mapped version. - */ - public void owned(IgniteTxKey<K> key, GridCacheVersion ownerMapped) { - if (owned == null) - owned = new GridLeanMap<>(3); - - owned.put(key, ownerMapped); - } - - /** - * @return Owned versions map. - */ - public Map<IgniteTxKey<K>, GridCacheVersion> owned() { - return owned; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (ownedBytes == null && owned != null) { - ownedBytes = CU.marshal(ctx, owned); - - if (ctx.deploymentEnabled()) { - for (IgniteTxKey<K> k : owned.keySet()) - prepareObject(k, ctx); - } - } - - if (nearWrites != null) { - marshalTx(nearWrites, ctx); - - nearWritesBytes = new ArrayList<>(nearWrites.size()); - - for (IgniteTxEntry<K, V> e : nearWrites) - nearWritesBytes.add(ctx.marshaller().marshal(e)); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (ownedBytes != null && owned == null) - owned = ctx.marshaller().unmarshal(ownedBytes, ldr); - - if (nearWritesBytes != null) { - nearWrites = new ArrayList<>(nearWritesBytes.size()); - - for (byte[] arr : nearWritesBytes) - nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(nearWrites, true, ctx, ldr); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxPrepareRequest.class, this, "super", super.toString()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtTxPrepareRequest _clone = new GridDhtTxPrepareRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtTxPrepareRequest _clone = (GridDhtTxPrepareRequest)_msg; - - _clone.nearNodeId = nearNodeId; - _clone.futId = futId; - _clone.miniId = miniId; - _clone.topVer = topVer; - _clone.invalidateNearEntries = invalidateNearEntries; - _clone.nearWrites = nearWrites; - _clone.nearWritesBytes = nearWritesBytes; - _clone.owned = owned; - _clone.ownedBytes = ownedBytes; - _clone.nearXidVer = nearXidVer; - _clone.last = last; - _clone.subjId = subjId; - _clone.taskNameHash = taskNameHash; - _clone.preloadKeys = preloadKeys; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 22: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 23: - if (!commState.putBitSet(invalidateNearEntries)) - return false; - - commState.idx++; - - case 24: - if (!commState.putBoolean(last)) - return false; - - commState.idx++; - - case 25: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - case 26: - if (!commState.putUuid(nearNodeId)) - return false; - - commState.idx++; - - case 27: - if (nearWritesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(nearWritesBytes.size())) - return false; - - commState.it = nearWritesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 28: - if (!commState.putCacheVersion(nearXidVer)) - return false; - - commState.idx++; - - case 29: - if (!commState.putByteArray(ownedBytes)) - return false; - - commState.idx++; - - case 30: - if (!commState.putLong(topVer)) - return false; - - commState.idx++; - - case 31: - if (!commState.putUuid(subjId)) - return false; - - commState.idx++; - - case 32: - if (!commState.putInt(taskNameHash)) - return false; - - commState.idx++; - - case 33: - if (!commState.putBitSet(preloadKeys)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 22: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 23: - BitSet invalidateNearEntries0 = commState.getBitSet(); - - if (invalidateNearEntries0 == BIT_SET_NOT_READ) - return false; - - invalidateNearEntries = invalidateNearEntries0; - - commState.idx++; - - case 24: - if (buf.remaining() < 1) - return false; - - last = commState.getBoolean(); - - commState.idx++; - - case 25: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - case 26: - UUID nearNodeId0 = commState.getUuid(); - - if (nearNodeId0 == UUID_NOT_READ) - return false; - - nearNodeId = nearNodeId0; - - commState.idx++; - - case 27: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearWritesBytes == null) - nearWritesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - nearWritesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 28: - GridCacheVersion nearXidVer0 = commState.getCacheVersion(); - - if (nearXidVer0 == CACHE_VER_NOT_READ) - return false; - - nearXidVer = nearXidVer0; - - commState.idx++; - - case 29: - byte[] ownedBytes0 = commState.getByteArray(); - - if (ownedBytes0 == BYTE_ARR_NOT_READ) - return false; - - ownedBytes = ownedBytes0; - - commState.idx++; - - case 30: - if (buf.remaining() < 8) - return false; - - topVer = commState.getLong(); - - commState.idx++; - - case 31: - UUID subjId0 = commState.getUuid(); - - if (subjId0 == UUID_NOT_READ) - return false; - - subjId = subjId0; - - commState.idx++; - - case 32: - if (buf.remaining() < 4) - return false; - - taskNameHash = commState.getInt(); - - commState.idx++; - - case 33: - BitSet preloadKeys0 = commState.getBitSet(); - - if (preloadKeys0 == BIT_SET_NOT_READ) - return false; - - preloadKeys = preloadKeys0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 33; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java deleted file mode 100644 index e4ae3ac..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ /dev/null @@ -1,471 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * DHT transaction prepare response. - */ -public class GridDhtTxPrepareResponse<K, V> extends GridDistributedTxPrepareResponse<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Evicted readers. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxKey<K>> nearEvicted; - - /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> nearEvictedBytes; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Invalid partitions. */ - @GridToStringInclude - @GridDirectCollection(int.class) - private Collection<Integer> invalidParts; - - @GridDirectTransient - /** Preload entries. */ - private List<GridCacheEntryInfo<K, V>> preloadEntries; - - /** */ - @GridDirectCollection(byte[].class) - @GridDirectVersion(1) - private List<byte[]> preloadEntriesBytes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDhtTxPrepareResponse() { - // No-op. - } - - /** - * @param xid Xid version. - * @param futId Future ID. - * @param miniId Mini future ID. - */ - public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) { - super(xid); - - assert futId != null; - assert miniId != null; - - this.futId = futId; - this.miniId = miniId; - } - - /** - * @param xid Xid version. - * @param futId Future ID. - * @param miniId Mini future ID. - * @param err Error. - */ - public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err) { - super(xid, err); - - assert futId != null; - assert miniId != null; - - this.futId = futId; - this.miniId = miniId; - } - - /** - * @return Evicted readers. - */ - public Collection<IgniteTxKey<K>> nearEvicted() { - return nearEvicted; - } - - /** - * @param nearEvicted Evicted readers. - */ - public void nearEvicted(Collection<IgniteTxKey<K>> nearEvicted) { - this.nearEvicted = nearEvicted; - } - - /** - * @param nearEvictedBytes Near evicted bytes. - */ - public void nearEvictedBytes(Collection<byte[]> nearEvictedBytes) { - this.nearEvictedBytes = nearEvictedBytes; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return Invalid partitions. - */ - public Collection<Integer> invalidPartitions() { - return invalidParts; - } - - /** - * @param invalidParts Invalid partitions. - */ - public void invalidPartitions(Collection<Integer> invalidParts) { - this.invalidParts = invalidParts; - } - - /** - * Gets preload entries found on backup node. - * - * @return Collection of entry infos need to be preloaded. - */ - public Collection<GridCacheEntryInfo<K, V>> preloadEntries() { - return preloadEntries == null ? Collections.<GridCacheEntryInfo<K, V>>emptyList() : preloadEntries; - } - - /** - * Adds preload entry. - * - * @param info Info to add. - */ - public void addPreloadEntry(GridCacheEntryInfo<K, V> info) { - if (preloadEntries == null) - preloadEntries = new ArrayList<>(); - - preloadEntries.add(info); - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (nearEvictedBytes == null) - nearEvictedBytes = marshalCollection(nearEvicted, ctx); - - if (preloadEntriesBytes == null && preloadEntries != null) - preloadEntriesBytes = marshalCollection(preloadEntries, ctx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - // Unmarshal even if deployment is disabled, since we could get bytes initially. - if (nearEvicted == null && nearEvictedBytes != null) - nearEvicted = unmarshalCollection(nearEvictedBytes, ctx, ldr); - - if (preloadEntries == null && preloadEntriesBytes != null) - preloadEntries = unmarshalCollection(preloadEntriesBytes, ctx, ldr); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtTxPrepareResponse _clone = new GridDhtTxPrepareResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtTxPrepareResponse _clone = (GridDhtTxPrepareResponse)_msg; - - _clone.nearEvicted = nearEvicted; - _clone.nearEvictedBytes = nearEvictedBytes; - _clone.futId = futId; - _clone.miniId = miniId; - _clone.invalidParts = invalidParts; - _clone.preloadEntries = preloadEntries; - _clone.preloadEntriesBytes = preloadEntriesBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 10: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 11: - if (invalidParts != null) { - if (commState.it == null) { - if (!commState.putInt(invalidParts.size())) - return false; - - commState.it = invalidParts.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putInt((int)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 12: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - case 13: - if (nearEvictedBytes != null) { - if (commState.it == null) { - if (!commState.putInt(nearEvictedBytes.size())) - return false; - - commState.it = nearEvictedBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 14: - if (preloadEntriesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(preloadEntriesBytes.size())) - return false; - - commState.it = preloadEntriesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 10: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 11: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (invalidParts == null) - invalidParts = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - if (buf.remaining() < 4) - return false; - - int _val = commState.getInt(); - - invalidParts.add((Integer)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 12: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - case 13: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearEvictedBytes == null) - nearEvictedBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - nearEvictedBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 14: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (preloadEntriesBytes == null) - preloadEntriesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - preloadEntriesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 34; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java deleted file mode 100644 index 3352278..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import javax.cache.processor.*; -import java.io.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; - -/** - * Transaction created by system implicitly on remote nodes. - */ -public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Near node ID. */ - private UUID nearNodeId; - - /** Remote future ID. */ - private IgniteUuid rmtFutId; - - /** Near transaction ID. */ - private GridCacheVersion nearXidVer; - - /** Transaction nodes mapping (primary node -> related backup nodes). */ - private Map<UUID, Collection<UUID>> txNodes; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtTxRemote() { - // No-op. - } - - /** - * This constructor is meant for optimistic transactions. - * - * @param nearNodeId Near node ID. - * @param rmtFutId Remote future ID. - * @param nodeId Node ID. - * @param rmtThreadId Remote thread ID. - * @param topVer Topology version. - * @param xidVer XID version. - * @param commitVer Commit version. - * @param sys System flag. - * @param concurrency Concurrency level (should be pessimistic). - * @param isolation Transaction isolation. - * @param invalidate Invalidate flag. - * @param timeout Timeout. - * @param ctx Cache context. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param nearXidVer Near transaction ID. - * @param txNodes Transaction nodes mapping. - */ - public GridDhtTxRemote( - GridCacheSharedContext<K, V> ctx, - UUID nearNodeId, - IgniteUuid rmtFutId, - UUID nodeId, - long rmtThreadId, - long topVer, - GridCacheVersion xidVer, - GridCacheVersion commitVer, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - boolean invalidate, - long timeout, - int txSize, - @Nullable IgniteTxKey grpLockKey, - GridCacheVersion nearXidVer, - Map<UUID, Collection<UUID>> txNodes, - @Nullable UUID subjId, - int taskNameHash - ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); - - assert nearNodeId != null; - assert rmtFutId != null; - - this.nearNodeId = nearNodeId; - this.rmtFutId = rmtFutId; - this.nearXidVer = nearXidVer; - this.txNodes = txNodes; - - readMap = Collections.emptyMap(); - - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); - - topologyVersion(topVer); - } - - /** - * This constructor is meant for pessimistic transactions. - * - * @param nearNodeId Near node ID. - * @param rmtFutId Remote future ID. - * @param nodeId Node ID. - * @param nearXidVer Near transaction ID. - * @param rmtThreadId Remote thread ID. - * @param topVer Topology version. - * @param xidVer XID version. - * @param commitVer Commit version. - * @param sys System flag. - * @param concurrency Concurrency level (should be pessimistic). - * @param isolation Transaction isolation. - * @param invalidate Invalidate flag. - * @param timeout Timeout. - * @param ctx Cache context. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if transaction is group-lock. - */ - public GridDhtTxRemote( - GridCacheSharedContext<K, V> ctx, - UUID nearNodeId, - IgniteUuid rmtFutId, - UUID nodeId, - GridCacheVersion nearXidVer, - long rmtThreadId, - long topVer, - GridCacheVersion xidVer, - GridCacheVersion commitVer, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - boolean invalidate, - long timeout, - int txSize, - @Nullable IgniteTxKey grpLockKey, - @Nullable UUID subjId, - int taskNameHash - ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); - - assert nearNodeId != null; - assert rmtFutId != null; - - this.nearXidVer = nearXidVer; - this.nearNodeId = nearNodeId; - this.rmtFutId = rmtFutId; - - readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); - - topologyVersion(topVer); - } - - /** {@inheritDoc} */ - @Override public boolean dht() { - return true; - } - - /** {@inheritDoc} */ - @Override public UUID eventNodeId() { - return nearNodeId(); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> masterNodeIds() { - return Arrays.asList(nearNodeId, nodeId); - } - - /** {@inheritDoc} */ - @Override public UUID otherNodeId() { - return nearNodeId; - } - - /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return false; // Serializable will be enforced on primary mode. - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** {@inheritDoc} */ - @Override public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - - /** - * @return Near node ID. - */ - UUID nearNodeId() { - return nearNodeId; - } - - /** - * @return Remote future ID. - */ - IgniteUuid remoteFutureId() { - return rmtFutId; - } - - /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { - if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId)) - return false; - - if (cacheCtx.config().getBackups() == 0) - return true; - - // Check if we are on the backup node. - return !cacheCtx.affinity().backups(key, topVer).contains(cctx.localNode()); - } - - /** {@inheritDoc} */ - @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) { - super.addInvalidPartition(cacheCtx, part); - - for (Iterator<IgniteTxEntry<K, V>> it = writeMap.values().iterator(); it.hasNext();) { - IgniteTxEntry<K, V> e = it.next(); - - GridCacheEntryEx<K, V> cached = e.cached(); - - if (cached != null) { - if (cached.partition() == part) - it.remove(); - } - else if (cacheCtx.affinity().partition(e.key()) == part) - it.remove(); - } - } - - /** - * @param entry Write entry. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void addWrite(IgniteTxEntry<K, V> entry, ClassLoader ldr) throws IgniteCheckedException { - entry.unmarshal(cctx, false, ldr); - - GridCacheContext<K, V> cacheCtx = entry.context(); - - try { - GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion()); - - checkInternal(entry.txKey()); - - // Initialize cache entry. - entry.cached(cached, entry.keyBytes()); - - writeMap.put(entry.txKey(), entry); - - addExplicit(entry); - } - catch (GridDhtInvalidPartitionException e) { - addInvalidPartition(cacheCtx, e.partition()); - } - } - - /** - * @param cacheCtx Cache context. - * @param op Write operation. - * @param key Key to add to write set. - * @param keyBytes Key bytes. - * @param val Value. - * @param valBytes Value bytes. - * @param drVer Data center replication version. - * @param entryProcessors Entry processors. - * @param ttl TTL. - */ - public void addWrite(GridCacheContext<K, V> cacheCtx, - GridCacheOperation op, - IgniteTxKey<K> key, - byte[] keyBytes, - @Nullable V val, - @Nullable byte[] valBytes, - @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors, - @Nullable GridCacheVersion drVer, - long ttl) { - checkInternal(key); - - if (isSystemInvalidate()) - return; - - GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion()); - - IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx, - this, - op, - val, - ttl, - -1L, - cached, - drVer); - - txEntry.keyBytes(keyBytes); - txEntry.valueBytes(valBytes); - txEntry.entryProcessors(entryProcessors); - - writeMap.put(key, txEntry); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString()); - } -}