http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java deleted file mode 100644 index 003315b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ /dev/null @@ -1,3178 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.security.*; -import org.apache.ignite.portables.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.gridgain.grid.kernal.processors.cache.dr.*; -import org.gridgain.grid.kernal.processors.dr.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; -import static org.gridgain.grid.kernal.processors.dr.GridDrType.*; - -/** - * Transaction adapter for cache transactions. - */ -public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K, V> - implements GridCacheTxLocalEx<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Per-transaction read map. */ - @GridToStringExclude - protected Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> txMap; - - /** Read view on transaction map. */ - @GridToStringExclude - protected GridCacheTxMap<K, V> readView; - - /** Write view on transaction map. */ - @GridToStringExclude - protected GridCacheTxMap<K, V> writeView; - - /** Minimal version encountered (either explicit lock or XID of this transaction). */ - protected GridCacheVersion minVer; - - /** Flag indicating with TM commit happened. */ - protected AtomicBoolean doneFlag = new AtomicBoolean(false); - - /** Committed versions, relative to base. */ - private Collection<GridCacheVersion> committedVers = Collections.emptyList(); - - /** Rolled back versions, relative to base. */ - private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList(); - - /** Base for completed versions. */ - private GridCacheVersion completedBase; - - /** Flag indicating partition lock in group lock transaction. */ - private boolean partLock; - - /** Flag indicating that transformed values should be sent to remote nodes. */ - private boolean sndTransformedVals; - - /** Commit error. */ - protected AtomicReference<Throwable> commitErr = new AtomicReference<>(); - - /** Active cache IDs. */ - protected Set<Integer> activeCacheIds = new HashSet<>(); - - /** - * Empty constructor required for {@link Externalizable}. - */ - protected GridCacheTxLocalAdapter() { - // No-op. - } - - /** - * @param cctx Cache registry. - * @param xidVer Transaction ID. - * @param implicit {@code True} if transaction was implicitly started by the system, - * {@code false} if it was started explicitly by user. - * @param implicitSingle {@code True} if transaction is implicit with only one key. - * @param sys System flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition. - */ - protected GridCacheTxLocalAdapter( - GridCacheSharedContext<K, V> cctx, - GridCacheVersion xidVer, - boolean implicit, - boolean implicitSingle, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - boolean invalidate, - boolean storeEnabled, - int txSize, - @Nullable GridCacheTxKey grpLockKey, - boolean partLock, - @Nullable UUID subjId, - int taskNameHash - ) { - super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate, - storeEnabled, txSize, grpLockKey, subjId, taskNameHash); - - assert !partLock || grpLockKey != null; - - this.partLock = partLock; - - minVer = xidVer; - } - - /** {@inheritDoc} */ - @Override public UUID eventNodeId() { - return cctx.localNodeId(); - } - - /** {@inheritDoc} */ - @Override public UUID originatingNodeId() { - return cctx.localNodeId(); - } - - /** {@inheritDoc} */ - @Override public boolean empty() { - return txMap.isEmpty(); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> masterNodeIds() { - return Collections.singleton(nodeId); - } - - /** {@inheritDoc} */ - @Override public boolean partitionLock() { - return partLock; - } - - /** {@inheritDoc} */ - @Override public Throwable commitError() { - return commitErr.get(); - } - - /** {@inheritDoc} */ - @Override public void commitError(Throwable e) { - commitErr.compareAndSet(null, e); - } - - /** {@inheritDoc} */ - @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { - assert false; - return false; - } - - /** - * Gets collection of active cache IDs for this transaction. - * - * @return Collection of active cache IDs. - */ - @Override public Collection<Integer> activeCacheIds() { - return activeCacheIds; - } - - /** {@inheritDoc} */ - @Override public boolean isStarted() { - return txMap != null; - } - - /** {@inheritDoc} */ - @Override public boolean hasWriteKey(GridCacheTxKey<K> key) { - return writeView.containsKey(key); - } - - /** - * @return Transaction read set. - */ - @Override public Set<GridCacheTxKey<K>> readSet() { - return txMap == null ? Collections.<GridCacheTxKey<K>>emptySet() : readView.keySet(); - } - - /** - * @return Transaction write set. - */ - @Override public Set<GridCacheTxKey<K>> writeSet() { - return txMap == null ? Collections.<GridCacheTxKey<K>>emptySet() : writeView.keySet(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(GridCacheTxKey<K> key) { - if (txMap == null) - return false; - - GridCacheTxEntry<K, V> e = txMap.get(key); - - return e != null && e.op() == DELETE; - } - - /** {@inheritDoc} */ - @Override public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> readMap() { - return readView == null ? Collections.<GridCacheTxKey<K>, GridCacheTxEntry<K, V>>emptyMap() : readView; - } - - /** {@inheritDoc} */ - @Override public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> writeMap() { - return writeView == null ? Collections.<GridCacheTxKey<K>, GridCacheTxEntry<K, V>>emptyMap() : writeView; - } - - /** {@inheritDoc} */ - @Override public Collection<GridCacheTxEntry<K, V>> allEntries() { - return txMap == null ? Collections.<GridCacheTxEntry<K, V>>emptySet() : txMap.values(); - } - - /** {@inheritDoc} */ - @Override public Collection<GridCacheTxEntry<K, V>> readEntries() { - return readView == null ? Collections.<GridCacheTxEntry<K, V>>emptyList() : readView.values(); - } - - /** {@inheritDoc} */ - @Override public Collection<GridCacheTxEntry<K, V>> writeEntries() { - return writeView == null ? Collections.<GridCacheTxEntry<K, V>>emptyList() : writeView.values(); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridCacheTxEntry<K, V> entry(GridCacheTxKey<K> key) { - return txMap == null ? null : txMap.get(key); - } - - /** {@inheritDoc} */ - @Override public void seal() { - if (readView != null) - readView.seal(); - - if (writeView != null) - writeView.seal(); - } - - /** - * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent - * to remote nodes. - */ - public void sendTransformedValues(boolean snd) { - sndTransformedVals = snd; - } - - /** - * @return {@code True} if should be committed after lock is acquired. - */ - protected boolean commitAfterLock() { - return implicit() && (!dht() || colocated()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable @Override public GridTuple<V> peek( - GridCacheContext<K, V> cacheCtx, - boolean failFast, - K key, - IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) throws GridCacheFilterFailedException { - GridCacheTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key)); - - if (e != null) { - // We should look at tx entry previous value. If this is a user peek then previous - // value is the same as value. If this is a filter evaluation peek then previous value holds - // value visible to filter while value contains value enlisted for write. - if (!F.isAll(e.cached().wrap(false), filter)) - return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null; - - return e.hasPreviousValue() ? F.t(e.previousValue()) : null; - } - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> loadMissing( - final GridCacheContext<K, V> cacheCtx, - boolean async, - final Collection<? extends K> keys, - boolean deserializePortable, - final IgniteBiInClosure<K, V> c - ) { - if (!async) { - try { - return new GridFinishedFuture<>(cctx.kernalContext(), - cacheCtx.store().loadAllFromStore(this, keys, c)); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - } - else - return cctx.kernalContext().closure().callLocalSafe( - new GPC<Boolean>() { - @Override public Boolean call() throws Exception { - return cacheCtx.store().loadAllFromStore(GridCacheTxLocalAdapter.this, keys, c); - } - }, - true); - } - - /** - * Gets minimum version present in transaction. - * - * @return Minimum versions. - */ - @Override public GridCacheVersion minVersion() { - return minVer; - } - - /** - * @throws IgniteCheckedException If prepare step failed. - */ - @SuppressWarnings({"CatchGenericClass"}) - public void userPrepare() throws IgniteCheckedException { - if (state() != PREPARING) { - if (timedOut()) - throw new IgniteTxTimeoutException("Transaction timed out: " + this); - - IgniteTxState state = state(); - - setRollbackOnly(); - - throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']'); - } - - checkValid(); - - try { - cctx.tm().prepareTx(this); - } - catch (IgniteCheckedException e) { - throw e; - } - catch (Throwable e) { - setRollbackOnly(); - - throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e); - } - } - - /** {@inheritDoc} */ - @Override public void commit() throws IgniteCheckedException { - try { - commitAsync().get(); - } - finally { - cctx.tm().txContextReset(); - } - } - - /** {@inheritDoc} */ - @Override public void prepare() throws IgniteCheckedException { - prepareAsync().get(); - } - - /** - * Checks that locks are in proper state for commit. - * - * @param entry Cache entry to check. - */ - private void checkCommitLocks(GridCacheEntryEx<K, V> entry) { - assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry + - ", tx=" + this + ']'; - } - - /** - * Uncommits transaction by invalidating all of its entries. - */ - @SuppressWarnings({"CatchGenericClass"}) - private void uncommit() { - for (GridCacheTxEntry<K, V> e : writeMap().values()) { - try { - GridCacheEntryEx<K, V> cacheEntry = e.cached(); - - if (e.op() != NOOP) - cacheEntry.invalidate(null, xidVer); - } - catch (Throwable t) { - U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t); - - break; - } - } - - cctx.tm().uncommitTx(this); - } - - /** - * Gets cache entry for given key. - * - * @param key Key. - * @return Cache entry. - */ - protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key) { - return cacheCtx.cache().entryEx(key.key()); - } - - /** - * Gets cache entry for given key and topology version. - * - * @param key Key. - * @param topVer Topology version. - * @return Cache entry. - */ - protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, long topVer) { - return cacheCtx.cache().entryEx(key.key(), topVer); - } - - /** - * Performs batch database operations. This commit must be called - * before {@link #userCommit()}. This way if there is a DB failure, - * cache transaction can still be rolled back. - * - * @param writeEntries Transaction write set. - * @throws IgniteCheckedException If batch update failed. - */ - @SuppressWarnings({"CatchGenericClass"}) - protected void batchStoreCommit(Iterable<GridCacheTxEntry<K, V>> writeEntries) throws IgniteCheckedException { - GridCacheStoreManager<K, V> store = store(); - - if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) { - try { - if (writeEntries != null) { - Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null; - List<K> rmvCol = null; - - boolean skipNear = near() && store.writeToStoreFromDht(); - - for (GridCacheTxEntry<K, V> e : writeEntries) { - if (skipNear && e.cached().isNear()) - continue; - - boolean intercept = e.context().config().getInterceptor() != null; - - if (intercept || !F.isEmpty(e.transformClosures())) - e.cached().unswap(true, false); - - GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false); - - GridCacheContext<K, V> cacheCtx = e.context(); - - GridCacheOperation op = res.get1(); - K key = e.key(); - V val = res.get2(); - GridCacheVersion ver = writeVersion(); - - if (op == CREATE || op == UPDATE) { - // Batch-process all removes if needed. - if (rmvCol != null && !rmvCol.isEmpty()) { - store.removeAllFromStore(this, rmvCol); - - // Reset. - rmvCol.clear(); - } - - if (intercept) { - V old = e.cached().rawGetOrUnmarshal(true); - - val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val); - - if (val == null) - continue; - - val = cacheCtx.unwrapTemporary(val); - } - - if (putMap == null) - putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - - putMap.put(key, F.t(val, ver)); - } - else if (op == DELETE) { - // Batch-process all puts if needed. - if (putMap != null && !putMap.isEmpty()) { - store.putAllToStore(this, putMap); - - // Reset. - putMap.clear(); - } - - if (intercept) { - V old = e.cached().rawGetOrUnmarshal(true); - - IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor() - .onBeforeRemove(key, old); - - if (cacheCtx.cancelRemove(t)) - continue; - } - - if (rmvCol == null) - rmvCol = new LinkedList<>(); - - rmvCol.add(key); - } - else if (log.isDebugEnabled()) - log.debug("Ignoring NOOP entry for batch store commit: " + e); - } - - if (putMap != null && !putMap.isEmpty()) { - assert rmvCol == null || rmvCol.isEmpty(); - - // Batch put at the end of transaction. - store.putAllToStore(this, putMap); - } - - if (rmvCol != null && !rmvCol.isEmpty()) { - assert putMap == null || putMap.isEmpty(); - - // Batch remove at the end of transaction. - store.removeAllFromStore(this, rmvCol); - } - } - - // Commit while locks are held. - store.txEnd(this, true); - } - catch (IgniteCheckedException ex) { - commitError(ex); - - setRollbackOnly(); - - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); - - throw ex; - } - catch (Throwable ex) { - commitError(ex); - - setRollbackOnly(); - - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); - - throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); - } - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CatchGenericClass"}) - @Override public void userCommit() throws IgniteCheckedException { - IgniteTxState state = state(); - - if (state != COMMITTING) { - if (timedOut()) - throw new IgniteTxTimeoutException("Transaction timed out: " + this); - - setRollbackOnly(); - - throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); - } - - checkValid(); - - boolean empty = F.isEmpty(near() ? txMap : writeMap()); - - // Register this transaction as completed prior to write-phase to - // ensure proper lock ordering for removed entries. - // We add colocated transaction to committed set even if it is empty to correctly order - // locks on backup nodes. - if (!empty || colocated()) - cctx.tm().addCommittedTx(this); - - if (groupLock()) - addGroupTxMapping(writeSet()); - - if (!empty) { - // We are holding transaction-level locks for entries here, so we can get next write version. - writeVersion(cctx.versions().next(topologyVersion())); - - batchStoreCommit(writeMap().values()); - - try { - cctx.tm().txContext(this); - - long topVer = topologyVersion(); - - /* - * Commit to cache. Note that for 'near' transaction we loop through all the entries. - */ - for (GridCacheTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) { - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE; - - UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId(); - - try { - while (true) { - try { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - // Must try to evict near entries before committing from - // transaction manager to make sure locks are held. - if (!evictNearEntry(txEntry, false)) { - if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) { - cached.markObsolete(xidVer); - - break; - } - - if (cached.detached()) - break; - - GridCacheEntryEx<K, V> nearCached = null; - - boolean metrics = true; - - if (updateNearCache(cacheCtx, txEntry.key(), topVer)) - nearCached = cacheCtx.dht().near().peekEx(txEntry.key()); - else if (cacheCtx.isNear() && txEntry.locallyMapped()) - metrics = false; - - boolean evt = !isNearLocallyMapped(txEntry, false); - - // For near local transactions we must record DHT version - // in order to keep near entries on backup nodes until - // backup remote transaction completes. - if (cacheCtx.isNear()) - ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion()); - - if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters())) - txEntry.cached().unswap(true, false); - - GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry, - true); - - GridCacheOperation op = res.get1(); - V val = res.get2(); - byte[] valBytes = res.get3(); - - // Preserve TTL if needed. - if (txEntry.ttl() < 0) - txEntry.ttl(cached.ttl()); - - // Deal with DR conflicts. - GridCacheVersion explicitVer = txEntry.drVersion() != null ? - txEntry.drVersion() : writeVersion(); - - GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, - txEntry, - explicitVer, - op, - val, - valBytes, - txEntry.ttl(), - txEntry.drExpireTime()); - - if (drRes != null) { - op = drRes.operation(); - val = drRes.value(); - valBytes = drRes.valueBytes(); - - if (drRes.isMerge()) - explicitVer = writeVersion(); - } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; - - if (sndTransformedVals || (drRes != null)) { - assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null); - - txEntry.value(val, true, false); - txEntry.valueBytes(valBytes); - txEntry.op(op); - txEntry.transformClosures(null); - txEntry.drVersion(explicitVer); - } - - if (op == CREATE || op == UPDATE) { - GridCacheUpdateTxResult<V> updRes = cached.innerSet( - this, - eventNodeId(), - txEntry.nodeId(), - val, - valBytes, - false, - false, - txEntry.ttl(), - evt, - metrics, - topVer, - txEntry.filters(), - cached.detached() ? DR_NONE : drType, - txEntry.drExpireTime(), - cached.isNear() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName()); - - if (nearCached != null && updRes.success()) - nearCached.innerSet( - null, - eventNodeId(), - nodeId, - val, - valBytes, - false, - false, - txEntry.ttl(), - false, - metrics, - topVer, - CU.<K, V>empty(), - DR_NONE, - txEntry.drExpireTime(), - null, - CU.subjectId(this, cctx), - resolveTaskName()); - } - else if (op == DELETE) { - GridCacheUpdateTxResult<V> updRes = cached.innerRemove( - this, - eventNodeId(), - txEntry.nodeId(), - false, - false, - evt, - metrics, - topVer, - txEntry.filters(), - cached.detached() ? DR_NONE : drType, - cached.isNear() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName()); - - if (nearCached != null && updRes.success()) - nearCached.innerRemove( - null, - eventNodeId(), - nodeId, - false, - false, - false, - metrics, - topVer, - CU.<K, V>empty(), - DR_NONE, - null, - CU.subjectId(this, cctx), - resolveTaskName()); - } - else if (op == RELOAD) { - cached.innerReload(CU.<K, V>empty()); - - if (nearCached != null) - nearCached.innerReload(CU.<K, V>empty()); - } - else if (op == READ) { - if (log.isDebugEnabled()) - log.debug("Ignoring READ entry when committing: " + txEntry); - } - else { - assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()): - "Transaction does not own lock for group lock entry during commit [tx=" + - this + ", txEntry=" + txEntry + ']'; - - if (log.isDebugEnabled()) - log.debug("Ignoring NOOP entry when committing: " + txEntry); - } - } - - // Check commit locks after set, to make sure that - // we are not changing obsolete entries. - // (innerSet and innerRemove will throw an exception - // if an entry is obsolete). - if (txEntry.op() != READ && !txEntry.groupLockEntry()) - checkCommitLocks(cached); - - // Break out of while loop. - break; - } - // If entry cached within transaction got removed. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during transaction commit (will retry): " + txEntry); - - txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes()); - } - } - } - catch (Throwable ex) { - // We are about to initiate transaction rollback when tx has started to committing. - // Need to remove version from committed list. - cctx.tm().removeCommittedTx(this); - - if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) { - if (log.isDebugEnabled()) - log.debug("Failed to update mongo document index (transaction entry will " + - "be ignored): " + txEntry); - - // Set operation to NOOP. - txEntry.op(NOOP); - - setRollbackOnly(); - - throw ex; - } - else { - IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " + - "(all transaction entries will be invalidated, however there was a window when " + - "entries for this transaction were visible to others): " + this, ex); - - U.error(log, "Heuristic transaction failure.", err); - - commitErr.compareAndSet(null, err); - - state(UNKNOWN); - - try { - // Courtesy to minimize damage. - uncommit(); - } - catch (Throwable ex1) { - U.error(log, "Failed to uncommit transaction: " + this, ex1); - } - - throw err; - } - } - } - } - finally { - cctx.tm().txContextReset(); - } - } - else { - GridCacheStoreManager<K, V> store = store(); - - if (store != null && (!internal() || groupLock())) { - try { - store.txEnd(this, true); - } - catch (IgniteCheckedException e) { - commitError(e); - - setRollbackOnly(); - - cctx.tm().removeCommittedTx(this); - - throw e; - } - } - } - - // Do not unlock transaction entries if one-phase commit. - if (!onePhaseCommit()) { - if (doneFlag.compareAndSet(false, true)) { - // Unlock all locks. - cctx.tm().commitTx(this); - - boolean needsCompletedVersions = needsCompletedVersions(); - - assert !needsCompletedVersions || completedBase != null; - assert !needsCompletedVersions || committedVers != null; - assert !needsCompletedVersions || rolledbackVers != null; - } - } - } - - /** - * Commits transaction to transaction manager. Used for one-phase commit transactions only. - */ - public void tmCommit() { - assert onePhaseCommit(); - - if (doneFlag.compareAndSet(false, true)) { - // Unlock all locks. - cctx.tm().commitTx(this); - - state(COMMITTED); - - boolean needsCompletedVersions = needsCompletedVersions(); - - assert !needsCompletedVersions || completedBase != null; - assert !needsCompletedVersions || committedVers != null; - assert !needsCompletedVersions || rolledbackVers != null; - } - } - - /** {@inheritDoc} */ - @Override public void completedVersions( - GridCacheVersion completedBase, - Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers) { - this.completedBase = completedBase; - this.committedVers = committedVers; - this.rolledbackVers = rolledbackVers; - } - - /** - * @return Completed base for ordering. - */ - public GridCacheVersion completedBase() { - return completedBase; - } - - /** - * @return Committed versions. - */ - public Collection<GridCacheVersion> committedVersions() { - return committedVers; - } - - /** - * @return Rolledback versions. - */ - public Collection<GridCacheVersion> rolledbackVersions() { - return rolledbackVers; - } - - /** {@inheritDoc} */ - @Override public void userRollback() throws IgniteCheckedException { - IgniteTxState state = state(); - - if (state != ROLLING_BACK && state != ROLLED_BACK) { - setRollbackOnly(); - - throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']', - commitErr.get()); - } - - if (doneFlag.compareAndSet(false, true)) { - try { - if (near()) - // Must evict near entries before rolling back from - // transaction manager, so they will be removed from cache. - for (GridCacheTxEntry<K, V> e : allEntries()) - evictNearEntry(e, false); - - cctx.tm().rollbackTx(this); - - GridCacheStoreManager<K, V> store = store(); - - if (store != null && (near() || store.writeToStoreFromDht())) { - if (!internal() || groupLock()) - store.txEnd(this, false); - } - } - catch (Error | IgniteCheckedException | RuntimeException e) { - U.addLastCause(e, commitErr.get(), log); - - throw e; - } - } - } - - /** - * Checks if there is a cached or swapped value for - * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. - * - * - * @param keys Key to enlist. - * @param cached Cached entry, if called from entry wrapper. - * @param map Return map. - * @param missed Map of missed keys. - * @param keysCnt Keys count (to avoid call to {@code Collection.size()}). - * @param deserializePortable Deserialize portable flag. - * @param filter Filter to test. - * @throws IgniteCheckedException If failed. - * @return Enlisted keys. - */ - @SuppressWarnings({"RedundantTypeArguments"}) - private Collection<K> enlistRead( - final GridCacheContext<K, V> cacheCtx, - Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, - Map<K, V> map, - Map<K, GridCacheVersion> missed, - int keysCnt, - boolean deserializePortable, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - assert !F.isEmpty(keys); - assert keysCnt == keys.size(); - assert cached == null || F.first(keys).equals(cached.key()); - - cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ); - - groupLockSanityCheck(cacheCtx, keys); - - boolean single = keysCnt == 1; - - Collection<K> lockKeys = null; - - long topVer = topologyVersion(); - - // In this loop we cover only read-committed or optimistic transactions. - // Transactions that are pessimistic and not read-committed are covered - // outside of this loop. - for (K key : keys) { - if (key == null) - continue; - - if (pessimistic() && !readCommitted()) - addActiveCache(cacheCtx); - - GridCacheTxKey<K> txKey = cacheCtx.txKey(key); - - // Check write map (always check writes first). - GridCacheTxEntry<K, V> txEntry = entry(txKey); - - // Either non-read-committed or there was a previous write. - if (txEntry != null) { - if (cacheCtx.isAll(txEntry.cached(), filter)) { - V val = txEntry.value(); - - // Read value from locked entry in group-lock transaction as well. - if (txEntry.hasValue()) { - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } - - if (val != null) { - V val0 = val; - - if (cacheCtx.portableEnabled()) - val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); - - map.put(key, val0); - } - } - else { - assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry()); - - while (true) { - try { - Object transformClo = - (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.transformClosures()) : null; - - val = txEntry.cached().innerGet(this, - /*swap*/true, - /*read-through*/false, - /*fail fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - /*temporary*/false, - CU.subjectId(this, cctx), - transformClo, - resolveTaskName(), - filter); - - if (val != null) { - if (!readCommitted()) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } - - V val0 = val; - - if (cacheCtx.portableEnabled()) - val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); - - map.put(key, val0); - } - else - missed.put(key, txEntry.cached().version()); - - break; - } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + txEntry); - - if (!readCommitted()) - txEntry.readValue(e.<V>value()); - } - catch (GridCacheEntryRemovedException ignored) { - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes()); - } - } - } - } - } - // First time access within transaction. - else { - if (lockKeys == null) - lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt); - - if (!single) - lockKeys.add(key); - - while (true) { - GridCacheEntryEx<K, V> entry; - - if (cached != null) { - entry = cached; - - cached = null; - } - else - entry = entryEx(cacheCtx, txKey, topVer); - - try { - GridCacheVersion ver = entry.version(); - - V val = null; - - if (!pessimistic() || readCommitted() || groupLock()) { - // This call will check for filter. - val = entry.innerGet(this, - /*swap*/true, - /*no read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - filter); - - if (val != null) { - V val0 = val; - - if (cacheCtx.portableEnabled()) - val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); - - map.put(key, val0); - } - else - missed.put(key, ver); - } - else - // We must wait for the lock in pessimistic mode. - missed.put(key, ver); - - if (!readCommitted()) { - txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null); - - if (groupLock()) - txEntry.groupLockEntry(true); - - // As optimization, mark as checked immediately - // for non-pessimistic if value is not null. - if (val != null && !pessimistic()) - txEntry.markValid(); - } - - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); - } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + entry); - - if (!readCommitted()) { - // Value for which failure occurred. - V val = e.<V>value(); - - txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null); - - // Mark as checked immediately for non-pessimistic. - if (val != null && !pessimistic()) - txEntry.markValid(); - } - - break; // While loop. - } - } - } - } - - return lockKeys != null ? lockKeys : Collections.<K>emptyList(); - } - - /** - * Adds skipped key. - * - * @param skipped Skipped set (possibly {@code null}). - * @param key Key to add. - * @return Skipped set. - */ - private Set<K> skip(Set<K> skipped, K key) { - if (skipped == null) - skipped = new GridLeanSet<>(); - - skipped.add(key); - - if (log.isDebugEnabled()) - log.debug("Added key to skipped set: " + key); - - return skipped; - } - - /** - * Loads all missed keys for - * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. - * - * @param map Return map. - * @param missedMap Missed keys. - * @param redos Keys to retry. - * @param deserializePortable Deserialize portable flag. - * @param filter Filter. - * @return Loaded key-value pairs. - */ - private IgniteFuture<Map<K, V>> checkMissed( - final GridCacheContext<K, V> cacheCtx, - final Map<K, V> map, - final Map<K, GridCacheVersion> missedMap, - @Nullable final Collection<K> redos, - final boolean deserializePortable, - final IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) { - assert redos != null || pessimistic(); - - if (log.isDebugEnabled()) - log.debug("Loading missed values for missed map: " + missedMap); - - final Collection<K> loaded = new HashSet<>(); - - return new GridEmbeddedFuture<>(cctx.kernalContext(), - loadMissing( - cacheCtx, - false, missedMap.keySet(), deserializePortable, new CI2<K, V>() { - /** */ - private GridCacheVersion nextVer; - - @Override public void apply(K key, V val) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - GridCacheTxLocalAdapter.this); - - return; - } - - GridCacheVersion ver = missedMap.get(key); - - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - - return; - } - - V visibleVal = val; - - GridCacheTxKey<K> txKey = cacheCtx.txKey(key); - - GridCacheTxEntry<K, V> txEntry = entry(txKey); - - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - visibleVal = clos.apply(visibleVal); - } - } - - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; - - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); - - while (true) { - assert txEntry != null || readCommitted() || groupLock(); - - GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - - try { - boolean pass = cacheCtx.isAll(e, filter); - - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set = true; - - if (pass) { - try { - set = e.versionedValue(val, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); - - if (pessimistic() && !readCommitted() && !isRollbackOnly() && - (!groupLock() || F.eq(e.key(), groupLockKey()))) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + GridCacheTxLocalAdapter.this + "]"); - - setRollbackOnly(); - - return; - } - - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); - - continue; // While loop. - } - } - - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); - - if (readCommitted() || groupLock()) { - cacheCtx.evicts().touch(e, topologyVersion()); - - if (pass && visibleVal != null) - map.put(key, visibleVal); - } - else { - assert txEntry != null; - - if (set || F.isEmptyOrNulls(filter)) { - txEntry.setAndMarkValid(val); - - if (pass && visibleVal != null) - map.put(key, visibleVal); - } - else { - assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " + - this; - - if (log.isDebugEnabled()) - log.debug("Failed to set versioned value for entry (will redo): " + e); - - redos.add(key); - } - } - - loaded.add(key); - - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); - - break; // While loop. - } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); - } - } - } - }), - new C2<Boolean, Exception, Map<K, V>>() { - @Override public Map<K, V> apply(Boolean b, Exception e) { - if (e != null) { - setRollbackOnly(); - - throw new GridClosureException(e); - } - - if (!b && !readCommitted()) { - // There is no store - we must mark the entries. - for (K key : missedMap.keySet()) { - GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); - - if (txEntry != null) - txEntry.markValid(); - } - } - - if (readCommitted()) { - Collection<K> notFound = new HashSet<>(missedMap.keySet()); - - notFound.removeAll(loaded); - - // In read-committed mode touch entries that have just been read. - for (K key : notFound) { - GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); - - GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) : - txEntry.cached(); - - if (entry != null) - cacheCtx.evicts().touch(entry, topologyVersion()); - } - } - - return map; - } - }); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<Map<K, V>> getAllAsync( - final GridCacheContext<K, V> cacheCtx, - Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable, - final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); - - init(); - - int keysCnt = keys.size(); - - boolean single = keysCnt == 1; - - try { - checkValid(); - - final Map<K, V> retMap = new GridLeanMap<>(keysCnt); - - final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0); - - final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt, - deserializePortable, filter); - - if (single && missed.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), retMap); - - // Handle locks. - if (pessimistic() && !readCommitted() && !groupLock()) { - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true, - isolation, isInvalidate(), CU.<K, V>empty()); - - PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() { - @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for read on keys: " + lockKeys); - - // Load keys only after the locks have been acquired. - for (K key : lockKeys) { - if (retMap.containsKey(key)) - // We already have a return value. - continue; - - GridCacheTxKey<K> txKey = cacheCtx.txKey(key); - - GridCacheTxEntry<K, V> txEntry = entry(txKey); - - assert txEntry != null; - - // Check if there is cached value. - while (true) { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - try { - Object transformClo = - (!F.isEmpty(txEntry.transformClosures()) && - cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.transformClosures()) : null; - - V val = cached.innerGet(GridCacheTxLocalAdapter.this, - cacheCtx.isSwapOrOffheapEnabled(), - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*events*/true, - /*temporary*/true, - CU.subjectId(GridCacheTxLocalAdapter.this, cctx), - transformClo, - resolveTaskName(), - filter); - - // If value is in cache and passed the filter. - if (val != null) { - missed.remove(key); - - txEntry.setAndMarkValid(val); - - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } - - if (cacheCtx.portableEnabled()) - val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); - - retMap.put(key, val); - } - - // Even though we bring the value back from lock acquisition, - // we still need to recheck primary node for consistent values - // in case of concurrent transactional locks. - - break; // While. - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed exception in get postLock (will retry): " + - cached); - - txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); - } - catch (GridCacheFilterFailedException e) { - // Failed value for the filter. - V val = e.value(); - - if (val != null) { - // If filter fails after lock is acquired, we don't reload, - // regardless if value is null or not. - missed.remove(key); - - txEntry.setAndMarkValid(val); - } - - break; // While. - } - } - } - - if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) - return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter); - - return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); - } - }; - - FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() { - @Override Map<K, V> finish(Map<K, V> loaded) { - retMap.putAll(loaded); - - return retMap; - } - }; - - if (fut.isDone()) { - try { - IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null); - - return fut1.isDone() ? - new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) : - new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos); - } - catch (GridClosureException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap()); - } - catch (IgniteCheckedException e) { - try { - return plc2.apply(false, e); - } - catch (Exception e1) { - return new GridFinishedFuture<>(cctx.kernalContext(), e1); - } - } - } - else { - return new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - plc2, - finClos); - } - } - else { - assert optimistic() || readCommitted() || groupLock(); - - final Collection<K> redos = new LinkedList<>(); - - if (!missed.isEmpty()) { - if (!readCommitted()) - for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); ) - if (retMap.containsKey(it.next())) - it.remove(); - - if (missed.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), retMap); - - return new GridEmbeddedFuture<>( - cctx.kernalContext(), - // First future. - checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter), - // Closure that returns another future, based on result from first. - new PMC<Map<K, V>>() { - @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) { - if (redos.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), - Collections.<K, V>emptyMap()); - - if (log.isDebugEnabled()) - log.debug("Starting to future-recursively get values for keys: " + redos); - - // Future recursion. - return getAllAsync(cacheCtx, redos, null, deserializePortable, filter); - } - }, - // Finalize. - new FinishClosure<Map<K, V>>() { - @Override Map<K, V> finish(Map<K, V> loaded) { - for (Map.Entry<K, V> entry : loaded.entrySet()) { - GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey())); - - V val = entry.getValue(); - - if (!readCommitted()) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } - - retMap.put(entry.getKey(), val); - } - - return retMap; - } - } - ); - } - - return new GridFinishedFuture<>(cctx.kernalContext(), retMap); - } - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync( - GridCacheContext<K, V> cacheCtx, - Map<? extends K, ? extends V> map, - boolean retval, - @Nullable GridCacheEntryEx<K, V> cached, - long ttl, - IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) { - return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> putAllDrAsync( - GridCacheContext<K, V> cacheCtx, - Map<? extends K, GridCacheDrInfo<V>> drMap - ) { - return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync( - GridCacheContext<K, V> cacheCtx, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map, - boolean retval, - @Nullable GridCacheEntryEx<K, V> cached, - long ttl - ) { - return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeAllDrAsync( - GridCacheContext<K, V> cacheCtx, - Map<? extends K, GridCacheVersion> drMap - ) { - return removeAllAsync0(cacheCtx, null, drMap, null, false, null); - } - - /** - * Checks filter for non-pessimistic transactions. - * - * @param cached Cached entry. - * @param filter Filter to check. - * @return {@code True} if passed or pessimistic. - * @throws IgniteCheckedException If failed. - */ - private boolean filter(GridCacheEntryEx<K, V> cached, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - return pessimistic() || cached.context().isAll(cached, filter); - } - - /** - * Internal routine for <tt>putAll(..)</tt> - * - * @param keys Keys to enlist. - * @param cached Cached entry. - * @param ttl Time to live for entry. If negative, leave unchanged. - * @param implicit Implicit flag. - * @param lookup Value lookup map ({@code null} for remove). - * @param transformMap Map with transform closures if this is a transform operation. - * @param retval Flag indicating whether a value should be returned. - * @param lockOnly If {@code true}, then entry will be enlisted as noop. - * @param filter User filters. - * @param ret Return value. - * @param enlisted Collection of keys enlisted into this transaction. - * @param drPutMap DR put map (optional). - * @param drRmvMap DR remove map (optional). - * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). - */ - protected IgniteFuture<Set<K>> enlistWrite( - GridCacheContext<K, V> cacheCtx, - Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, - long ttl, - boolean implicit, - @Nullable Map<? extends K, ? extends V> lookup, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, - boolean retval, - boolean lockOnly, - IgnitePredicate<GridCacheEntry<K, V>>[] filter, - final GridCacheReturn<V> ret, - Collection<K> enlisted, - @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap, - @Nullable Map<? extends K, GridCacheVersion> drRmvMap - ) { - assert cached == null || keys.size() == 1; - assert cached == null || F.first(keys).equals(cached.key()); - - try { - addActiveCache(cacheCtx); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - - Set<K> skipped = null; - - boolean rmv = lookup == null && transformMap == null; - - try { - // Set transform flag for transaction. - if (transformMap != null) - transform = true; - - groupLockSanityCheck(cacheCtx, keys); - - for (K key : keys) { - V val = rmv || lookup == null ? null : lookup.get(key); - IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key); - - GridCacheVersion drVer; - long drTtl; - long drExpireTime; - - if (drPutMap != null) { - GridCacheDrInfo<V> info = drPutMap.get(key); - - assert info != null; - - drVer = info.version(); - drTtl = info.ttl(); - drExpireTime = info.expireTime(); - } - else if (drRmvMap != null) { - assert drRmvMap.get(key) != null; - - drVer = drRmvMap.get(key); - drTtl = -1L; - drExpireTime = -1L; - } - else { - drVer = null; - drTtl = -1L; - drExpireTime = -1L; - } - - if (key == null) - continue; - - if (!rmv && val == null && transformClo == null) { - skipped = skip(skipped, key); - - continue; - } - - if (cacheCtx.portableEnabled()) - key = (K)cacheCtx.marshalToPortable(key); - - GridCacheTxKey<K> txKey = cacheCtx.txKey(key); - - GridCacheTxEntry<K, V> txEntry = entry(txKey); - - // First time access. - if (txEntry == null) { - while (true) { - GridCacheEntryEx<K, V> entry; - - if (cached != null) { - entry = cached; - - cached = null; - } - else { - entry = entryEx(cacheCtx, txKey, topologyVersion()); - - entry.unswap(true, false); - } - - try { - // Check if lock is being explicitly acquired by the same thread. - if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && - entry.lockedByThread(threadId, xidVer)) - throw new IgniteCheckedException("Cannot access key within transaction if lock is " + - "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer + - ", threadId=" + threadId + - ", locNodeId=" + cctx.localNodeId() + ']'); - - V old = null; - - boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); - - if (optimistic()) { - try { - //Should read through if filter is specified. - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/readThrough, - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - transformClo, - resolveTaskName(), - CU.<K, V>empty()); - } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false : "Empty filter failed: " + e; - } - } - else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - - if (!filter(entry, filter)) { - skipped = skip(skipped, key); - - ret.set(old, false); - - if (!readCommitted() && old != null) { - // Enlist failed filters as reads for non-read-committed mode, - // so future ops will get the same values. - txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, - null); - - txEntry.markValid(); - } - - if (readCommitted() || old == null) - cacheCtx.evicts().touch(entry, topologyVersion()); - - break; // While. - } - - txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM : - old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl, - drExpireTime, drVer); - - if (!implicit() && readCommitted()) - cacheCtx.evicts().touch(entry, topologyVersion()); - - if (groupLock() && !lockOnly) - txEntry.groupLockEntry(true); - - enlisted.add(key); - - if (!pessimistic() || (groupLock() && !lockOnly)) { - txEntry.markValid(); - - if (old == null) { - if (retval && !readThrough) { - // If return value is required, then we know for sure that there is only - // one key in the keys collection. - assert keys.size() == 1; - - IgniteFuture<Boolean> fut = loadMissing( - cacheCtx, - true, - F.asList(key), - deserializePortables(cacheCtx), - new CI2<K, V>() { - @Override public void apply(K k, V v) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + k + ", val=" + - v + ']'); - - ret.set(v, true); - } - }); - - return new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - new C2<Boolean, Exception, Set<K>>() { - @Override public Set<K> apply(Boolean b, Exception e) { - if (e != null) - throw new GridClosureException(e); - - return Collections.emptySet(); - } - } - ); - } - else - ret.set(null, true); - } - else - ret.set(old, true); - } - // Pessimistic. - else - ret.set(old, true); - - break; // While. - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction putAll0 method: " + entry); - } - } - } - else { - if (transformClo == null && txEntry.op() == TRANSFORM) - throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after transform closure is applied): " + key); - - GridCacheEntryEx<K, V> entry = txEntry.cached(); - - V v = txEntry.value(); - - boolean del = txEntry.op() == DELETE && rmv; - - if (!del) { - if (!filter(entry, filter)) { - skipped = skip(skipped, key); - - ret.set(v, false); - - continue; - } - - txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM : - v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl, - drExpireTime, drVer); - - enlisted.add(key); - } - - if (!pessimistic()) { - txEntry.markValid(); - - // Set tx entry and return values. - ret.set(v, true); - } - } - } - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - - return new GridFinishedFuture<>(cctx.kernalContext(), skipped); - } - - /** - * Post lock processing for put or remove. - * - * @param keys Keys. - * @param failed Collection of potentially failed keys (need to populate in this method). - * @param transformed Output map where transformed values will be placed. - * @param transformMap Transform map. - * @param ret Return value. - * @param rmv {@code True} if remove. - * @param retval Flag to return value or not. - * @param filter Filter to check entries. - * @return Failed keys. - * @throws IgniteCheckedException If error. - */ - protected Set<K> postLockWrite( - GridCacheContext<K, V> cacheCtx, - Iterable<? extends K> keys, - Set<K> failed, - @Nullable Map<K, V> transformed, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, - GridCacheReturn<V> ret, - boolean rmv, - boolean retval, - IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) throws IgniteCheckedException { - for (K k : keys) { - GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k)); - - if (txEntry == null) - throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " + - "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']'); - - while (true) { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - try { - assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() : - "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + - ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; - - if (log.isDebugEnabled()) - log.debug("Post lock write entry: " + cached); - - V v = txEntry.previousValue(); - boolean hasPrevVal = txEntry.hasPreviousValue(); - - if (onePhaseCommit()) - filter = txEntry.filters(); - - // If we have user-passed filter, we must read value into entry for peek(). - if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) - retval = true; - - if (retval) { - if (!cacheCtx.isNear()) { - try { - if (!hasPrevVal) - v = cached.innerGet(this, - /*swap*/retval, - /*read-through*/retval, - /*failFast*/false, - /*unmarshal*/retval, - /*metrics*/true, - /*event*/!dht(), - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - CU.<K, V>empty()); - } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false : "Empty filter failed: " + e; - } - } - else { - if (!hasPrevVal) - v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet(); - } - - ret.value(v); - } - - boolean pass = cacheCtx.isAll(cached, filter); - - // For remove operation we return true only if we are removing s/t, - // i.e. cached value is not null. - ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null)); - - if (onePhaseCommit()) - txEntry.filtersPassed(pass); - - if (pass) { - txEntry.markValid(); - - if (log.isDebugEnabled()) - log.debug("Filter passed in post lock for key: " + k); - } - else { - failed = skip(failed, k); - - // Revert operation to previous. (if no - NOOP, so entry will be unlocked). - txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value()); - txEntry.filters(CU.<K, V>empty()); - txEntry.filtersSet(false); - } - - break; // While. - } - // If entry cached within transaction got removed before lock. - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in putAllAsync method (will retry): " + cached); - - txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes()); - } - } - } - - if (log.isDebugEnabled()) - log.debug("Entries that failed after lock filter check: " + failed); - - return failed; - } - - /** - * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} - * maps must be non-null. - * - * @param map Key-value map to store. - * @param transformMap Transform map. - * @param drMap DR map. - * @param retval Key-transform value map to store. - * @param cached Cached entry, if any. - * @param ttl Time to live. - * @param filter Filter. - * @return Operation future. - */ - private IgniteFuture<GridCacheReturn<V>> putAllAsync0( - final GridCacheCon
<TRUNCATED>