http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 3a1a80a..3056ae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -50,19 +50,32 @@ import static org.apache.ignite.transactions.TransactionState.*; * */ @SuppressWarnings("unchecked") -public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx> - implements GridCacheMvccFuture<IgniteInternalTx> { +public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse> + implements GridCacheMvccFuture<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** */ + private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER = + new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() { + @Override public boolean collect(IgniteInternalTx e) { + return true; + } + + @Override public GridNearTxPrepareResponse reduce() { + // Nothing to aggregate. + return null; + } + }; + /** Logger. */ private static IgniteLogger log; /** Context. */ - private GridCacheSharedContext<K, V> cctx; + private GridCacheSharedContext<?, ?> cctx; /** Future ID. */ private IgniteUuid futId; @@ -128,15 +141,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** */ private boolean invoke; - /** */ - private IgniteInClosure<GridNearTxPrepareResponse> completeCb; - /** * @param cctx Context. * @param tx Transaction. * @param nearMiniId Near mini future id. * @param dhtVerMap DHT versions map. * @param last {@code True} if this is last prepare operation for node. + * @param retVal Return value flag. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ public GridDhtTxPrepareFuture( @@ -146,19 +157,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, boolean retVal, - Collection<UUID> lastBackups, - IgniteInClosure<GridNearTxPrepareResponse> completeCb + Collection<UUID> lastBackups ) { - super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { - @Override public boolean collect(IgniteInternalTx e) { - return true; - } - - @Override public IgniteInternalTx reduce() { - // Nothing to aggregate. - return tx; - } - }); + super(REDUCER); this.cctx = cctx; this.tx = tx; @@ -178,8 +179,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu this.retVal = retVal; - this.completeCb = completeCb; - assert dhtMap != null; assert nearMap != null; } @@ -382,7 +381,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param t Error. */ public void onError(Throwable t) { - onDone(tx, t); + onDone(null, t); } /** @@ -415,8 +414,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (log.isDebugEnabled()) log.debug("Marking all local candidates as ready: " + this); - Iterable<IgniteTxEntry> checkEntries = tx.groupLock() ? - Collections.singletonList(tx.groupLockEntry()) : writes; + Iterable<IgniteTxEntry> checkEntries = writes; for (IgniteTxEntry txEntry : checkEntries) { GridCacheContext cacheCtx = txEntry.context(); @@ -432,10 +430,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu txEntry.cached(entry); } - if (tx.optimistic() && txEntry.explicitVersion() == null) { - if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey())) - lockKeys.add(txEntry.txKey()); - } + if (tx.optimistic() && txEntry.explicitVersion() == null) + lockKeys.add(txEntry.txKey()); while (true) { try { @@ -479,7 +475,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } /** {@inheritDoc} */ - @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) { + @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) { assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " + "pending mini futures: " + this; @@ -495,16 +491,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu // Must create prepare response before transaction is committed to grab correct return value. final GridNearTxPrepareResponse res = createPrepareResponse(); - onComplete(); + onComplete(res); - if (!tx.near()) { + if (tx.commitOnPrepare()) { if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override - public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { + @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { try { if (replied.compareAndSet(false, true)) sendPrepareResponse(res); @@ -530,15 +525,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } else { if (replied.compareAndSet(false, true)) { + GridNearTxPrepareResponse res = createPrepareResponse(); + try { - sendPrepareResponse(createPrepareResponse()); + sendPrepareResponse(res); } catch (IgniteCheckedException e) { U.error(log, "Failed to send prepare response for transaction: " + tx, e); } finally { // Will call super.onDone(). - onComplete(); + onComplete(res); } return true; @@ -562,16 +559,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } /** + * @param res Response. * @throws IgniteCheckedException If failed to send response. */ private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException { if (!tx.nearNodeId().equals(cctx.localNodeId())) cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); - else { - assert completeCb != null; - - completeCb.apply(res); - } } /** @@ -616,10 +609,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu for (IgniteTxEntry e : writes) { IgniteTxEntry txEntry = tx.entry(e.txKey()); - GridCacheContext cacheCtx = txEntry.context(); - assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']'; + GridCacheContext cacheCtx = txEntry.context(); + while (true) { try { GridCacheEntryEx entry = txEntry.cached(); @@ -682,13 +675,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * Completeness callback. * + * @param res Response. * @return {@code True} if {@code done} flag was changed as a result of this call. */ - private boolean onComplete() { + private boolean onComplete(@Nullable GridNearTxPrepareResponse res) { if (last || tx.isSystemInvalidate()) tx.state(PREPARED); - if (super.onDone(tx, err.get())) { + if (super.onDone(res, err.get())) { // Don't forget to clean up. cctx.mvcc().removeFuture(this); @@ -702,7 +696,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * Completes this future. */ public void complete() { - onComplete(); + onComplete(null); } /** @@ -717,7 +711,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (tx.empty()) { tx.setRollbackOnly(); - onDone(tx); + onDone((GridNearTxPrepareResponse)null); } this.reads = reads; @@ -806,8 +800,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu tx, dhtWrites, nearWrites, - tx.groupLockKey(), - tx.partitionLock(), txNodes, tx.nearXidVersion(), true, @@ -821,14 +813,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu try { GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - GridCacheContext<K, V> cacheCtx = cached.context(); + GridCacheContext<?, ?> cacheCtx = cached.context(); if (entry.explicitVersion() == null) { GridCacheMvccCandidate added = cached.candidate(version()); - assert added != null || entry.groupLockEntry() : - "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future " + "[added=" + added + ", entry=" + entry + ']'; @@ -909,8 +898,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu tx, null, nearMapping.writes(), - tx.groupLockKey(), - tx.partitionLock(), tx.transactionNodes(), tx.nearXidVersion(), true, @@ -923,8 +910,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (entry.explicitVersion() == null) { GridCacheMvccCandidate added = entry.cached().candidate(version()); - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + "[added=" + added + ", entry=" + entry + ']'; @@ -977,7 +962,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheContext cacheCtx = entry.context(); - GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry); @@ -1234,7 +1219,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED); for (GridCacheEntryInfo info : res.preloadEntries()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId()); + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(info.cacheId()); while (true) { GridCacheEntryEx entry = cacheCtx.cache().entryEx(info.key());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index c033273..73f86fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -104,8 +104,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param tx Transaction. * @param dhtWrites DHT writes. * @param nearWrites Near writes. - * @param grpLockKey Group lock key if preparing group-lock transaction. - * @param partLock {@code True} if group-lock transaction locks partition. * @param txNodes Transaction nodes mapping. * @param nearXidVer Near transaction ID. * @param last {@code True} if this is last prepare request for node. @@ -118,15 +116,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { GridDhtTxLocalAdapter tx, Collection<IgniteTxEntry> dhtWrites, Collection<IgniteTxEntry> nearWrites, - IgniteTxKey grpLockKey, - boolean partLock, Map<UUID, Collection<UUID>> txNodes, GridCacheVersion nearXidVer, boolean last, boolean onePhaseCommit, UUID subjId, int taskNameHash) { - super(tx, null, dhtWrites, grpLockKey, partLock, txNodes, onePhaseCommit); + super(tx, null, dhtWrites, txNodes, onePhaseCommit); assert futId != null; assert miniId != null; @@ -337,79 +333,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 25: + case 23: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 26: + case 24: if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); - case 27: + case 25: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 28: + case 26: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 29: + case 27: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 30: + case 28: if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 31: + case 29: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 32: + case 30: if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 33: + case 31: if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 34: + case 32: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 35: + case 33: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 36: + case 34: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 37: + case 35: if (!writer.writeMessage("topVer", topVer)) return false; @@ -431,7 +427,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 25: + case 23: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -439,7 +435,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 24: invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) @@ -447,7 +443,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 25: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -455,7 +451,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 26: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -463,7 +459,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 27: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -471,7 +467,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 28: nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -479,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 29: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -487,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 30: ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -495,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 31: ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -503,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 32: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -511,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 35: + case 33: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -519,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 36: + case 34: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -527,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 37: + case 35: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -547,6 +543,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 38; + return 36; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 30464a5..0a69910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -77,7 +77,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param timeout Timeout. * @param ctx Cache context. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. * @param nearXidVer Near transaction ID. * @param txNodes Transaction nodes mapping. */ @@ -97,14 +96,13 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { boolean invalidate, long timeout, int txSize, - @Nullable IgniteTxKey grpLockKey, GridCacheVersion nearXidVer, Map<UUID, Collection<UUID>> txNodes, @Nullable UUID subjId, int taskNameHash ) { super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, grpLockKey, subjId, taskNameHash); + txSize, subjId, taskNameHash); assert nearNodeId != null; assert rmtFutId != null; @@ -139,7 +137,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param timeout Timeout. * @param ctx Cache context. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if transaction is group-lock. */ public GridDhtTxRemote( GridCacheSharedContext ctx, @@ -158,12 +155,11 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { boolean invalidate, long timeout, int txSize, - @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, grpLockKey, subjId, taskNameHash); + txSize, subjId, taskNameHash); assert nearNodeId != null; assert rmtFutId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java index 8da4da5..098ec97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java @@ -86,7 +86,9 @@ public class GridNoStorageCacheMap extends GridCacheConcurrentMap { boolean create) { if (create) { - GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0); + GridCacheMapEntry entry = ctx.useOffheapEntry() ? + new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0) : + new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0); return new GridTriple<>(entry, null, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 905f7bf..19d88e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -124,6 +124,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheMapEntry next, int hdrId) { + if (ctx.useOffheapEntry()) + return new GridDhtAtomicOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId); + return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, hdrId); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java new file mode 100644 index 0000000..91a8e65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; + +/** + * DHT atomic cache entry for off-heap tiered or off-heap values modes. + */ +public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry { + /** Off-heap value pointer. */ + private long valPtr; + + /** + * @param ctx Cache context. + * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param hdrId Header id. + */ + public GridDhtAtomicOffHeapCacheEntry(GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + int hdrId) { + super(ctx, topVer, key, hash, val, next, hdrId); + } + + /** {@inheritDoc} */ + @Override protected boolean hasOffHeapPointer() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override protected long offHeapPointer() { + return valPtr; + } + + /** {@inheritDoc} */ + @Override protected void offHeapPointer(long valPtr) { + this.valPtr = valPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c92d9ce..05b3c7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -86,6 +86,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheMapEntry next, int hdrId) { + if (ctx.useOffheapEntry()) + return new GridDhtColocatedOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId); + return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, hdrId); } }); @@ -126,7 +129,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean allowDetached ) { return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ? - new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0) : entryExx(key, topVer); + createEntry(key) : entryExx(key, topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index f10baa3..e6a4eaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -754,8 +754,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity mappedKeys.size(), inTx() ? tx.size() : mappedKeys.size(), inTx() && tx.syncCommit(), - inTx() ? tx.groupLockKey() : null, - inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, @@ -1090,10 +1088,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // If primary node left the grid before lock acquisition, fail the whole future. throw newTopologyException(null, primary.id()); - if (inTx() && tx.groupLock() && !primary.isLocal()) - throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " + - " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']'); - if (mapping == null || !primary.id().equals(mapping.node().id())) mapping = new GridNearLockMapping(primary, key); else @@ -1281,25 +1275,18 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); - - return; - } - - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); - - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - } - catch (IgniteCheckedException e) { - onDone(e); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); return; } + + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); } else cctx.mvcc().markExplicitOwner(k, threadId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java new file mode 100644 index 0000000..ed842ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; + +/** + * Cache entry for colocated cache for off-heap tiered or off-heap values modes. + */ +public class GridDhtColocatedOffHeapCacheEntry extends GridDhtColocatedCacheEntry { + /** Off-heap value pointer. */ + private long valPtr; + + /** + * @param ctx Cache context. + * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param hdrId Header id. + */ + public GridDhtColocatedOffHeapCacheEntry(GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + int hdrId) { + super(ctx, topVer, key, hash, val, next, hdrId); + } + + /** {@inheritDoc} */ + @Override protected boolean hasOffHeapPointer() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override protected long offHeapPointer() { + return valPtr; + } + + /** {@inheritDoc} */ + @Override protected void offHeapPointer(long valPtr) { + this.valPtr = valPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 5c4dd13..2c84bd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -46,10 +46,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { * * @param val Value. * @param ver Version. - * @throws IgniteCheckedException If value unmarshalling failed. */ - public void resetFromPrimary(CacheObject val, GridCacheVersion ver) - throws IgniteCheckedException { + public void resetFromPrimary(CacheObject val, GridCacheVersion ver) { value(val); this.ver = ver; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 45d332c..4b8db00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1252,7 +1252,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.kernalContext().timeout().removeTimeoutObject(old); GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter( - cctx.gridConfig().getNetworkTimeout() * cctx.gridConfig().getCacheConfiguration().length) { + cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) { @Override public void onTimeout() { if (isDone()) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 145e980..8258b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -76,6 +76,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda ) { // Can't hold any locks here - this method is invoked when // holding write-lock on the whole cache map. + if (ctx.useOffheapEntry()) + return new GridNearOffHeapCacheEntry(ctx, key, hash, val, next, hdrId); + return new GridNearCacheEntry(ctx, key, hash, val, next, hdrId); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index c7fa4ab..797fd32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -204,15 +204,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @param topVer Topology version. * @return {@code True} if reset was done. * @throws GridCacheEntryRemovedException If obsolete. - * @throws IgniteCheckedException If failed. */ - @SuppressWarnings( {"RedundantTypeArguments"}) public boolean resetFromPrimary(CacheObject val, GridCacheVersion ver, GridCacheVersion dhtVer, UUID primaryNodeId, AffinityTopologyVersion topVer) - throws GridCacheEntryRemovedException, IgniteCheckedException + throws GridCacheEntryRemovedException { assert dhtVer != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index a427b65..0ffb4e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -890,8 +890,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B mappedKeys.size(), inTx() ? tx.size() : mappedKeys.size(), inTx() && tx.syncCommit(), - inTx() ? tx.groupLockKey() : null, - inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, @@ -1188,10 +1186,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // If primary node left the grid before lock acquisition, fail the whole future. throw newTopologyException(null, primary.id()); - if (inTx() && tx.groupLock() && !primary.isLocal()) - throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " + - " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']'); - if (mapping == null || !primary.id().equals(mapping.node().id())) mapping = new GridNearLockMapping(primary, key); else @@ -1450,11 +1444,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Replace old entry with new one. entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); } - catch (IgniteCheckedException e) { - onDone(e); - - return; - } } i++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 1ba4bfe..e71dd65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -105,8 +104,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param keyCnt Number of keys. * @param txSize Expected transaction size. * @param syncCommit Synchronous commit flag. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock If partition is locked. * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. @@ -130,8 +127,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { int keyCnt, int txSize, boolean syncCommit, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, @Nullable UUID subjId, int taskNameHash, long accessTtl, @@ -151,8 +146,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { timeout, keyCnt, txSize, - grpLockKey, - partLock, skipStore); assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; @@ -356,79 +349,79 @@ public class GridNearLockRequest extends GridDistributedLockRequest { } switch (writer.state()) { - case 23: + case 21: if (!writer.writeLong("accessTtl", accessTtl)) return false; writer.incrementState(); - case 24: + case 22: if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 25: + case 23: if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 26: + case 24: if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; writer.incrementState(); - case 27: + case 25: if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; writer.incrementState(); - case 28: + case 26: if (!writer.writeBoolean("implicitTx", implicitTx)) return false; writer.incrementState(); - case 29: + case 27: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 30: + case 28: if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); - case 31: + case 29: if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); - case 32: + case 30: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 33: + case 31: if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); - case 34: + case 32: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 35: + case 33: if (!writer.writeMessage("topVer", topVer)) return false; @@ -450,7 +443,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { return false; switch (reader.state()) { - case 23: + case 21: accessTtl = reader.readLong("accessTtl"); if (!reader.isLastRead()) @@ -458,7 +451,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 24: + case 22: dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) @@ -466,7 +459,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 25: + case 23: filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) @@ -474,7 +467,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 26: + case 24: hasTransforms = reader.readBoolean("hasTransforms"); if (!reader.isLastRead()) @@ -482,7 +475,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 27: + case 25: implicitSingleTx = reader.readBoolean("implicitSingleTx"); if (!reader.isLastRead()) @@ -490,7 +483,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 28: + case 26: implicitTx = reader.readBoolean("implicitTx"); if (!reader.isLastRead()) @@ -498,7 +491,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 29: + case 27: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -506,7 +499,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 30: + case 28: onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) @@ -514,7 +507,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 31: + case 29: retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) @@ -522,7 +515,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 32: + case 30: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -530,7 +523,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 33: + case 31: syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) @@ -538,7 +531,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 34: + case 32: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -546,7 +539,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 35: + case 33: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -566,7 +559,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 36; + return 34; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java new file mode 100644 index 0000000..25eb869 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.internal.processors.cache.*; + +/** + * Near cache entry for off-heap tiered or off-heap values modes. + */ +public class GridNearOffHeapCacheEntry extends GridNearCacheEntry { + /** Off-heap value pointer. */ + private long valPtr; + + /** + * @param ctx Cache context. + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param hdrId Header id. + */ + public GridNearOffHeapCacheEntry(GridCacheContext ctx, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + int hdrId) { + super(ctx, key, hash, val, next, hdrId); + } + + /** {@inheritDoc} */ + @Override protected boolean hasOffHeapPointer() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override protected long offHeapPointer() { + return valPtr; + } + + /** {@inheritDoc} */ + @Override protected void offHeapPointer(long valPtr) { + this.valPtr = valPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java new file mode 100644 index 0000000..4f74303 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -0,0 +1,768 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; +import static org.apache.ignite.transactions.TransactionState.*; + +/** + * + */ +public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter + implements GridCacheMvccFuture<IgniteInternalTx> { + /** */ + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + lockKeys.remove(entry.txKey()); + + // This will check for locks. + onDone(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture<?> fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture) fut; + + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + + found = true; + } + } + } + + return found; + } + + /** + * @param nodeId Failed node ID. + * @param mappings Remaining mappings. + * @param e Error. + */ + void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { + if (err.compareAndSet(null, e)) { + boolean marked = tx.setRollbackOnly(); + + if (e instanceof IgniteTxOptimisticCheckedException) { + assert nodeId != null : "Missing node ID for optimistic failure exception: " + e; + + tx.removeKeysMapping(nodeId, mappings); + } + + if (e instanceof IgniteTxRollbackCheckedException) { + if (marked) { + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to automatically rollback transaction: " + tx, ex); + } + } + } + + onComplete(); + } + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(nodeId, res); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteInternalTx t, Throwable err) { + // If locks were not acquired yet, delay completion. + if (isDone() || (err == null && !checkLocks())) + return false; + + this.err.compareAndSet(null, err); + + if (err == null) + tx.state(PREPARED); + + if (super.onDone(tx, err)) { + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + */ + private void onComplete() { + if (super.onDone(tx, err.get())) + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + } + + /** {@inheritDoc} */ + @Override public void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer != null) { + tx.topologyVersion(topVer); + + prepare0(); + + return; + } + + prepareOnTopology(); + } + + /** + * + */ + private void prepareOnTopology() { + GridDhtTopologyFuture topFut = topologyReadLock(); + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + StringBuilder invalidCaches = new StringBuilder(); + + boolean cacheInvalid = false; + + for (GridCacheContext ctx : cctx.cacheContexts()) { + if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) { + if (cacheInvalid) + invalidCaches.append(", "); + + invalidCaches.append(U.maskName(ctx.name())); + + cacheInvalid = true; + } + } + + if (cacheInvalid) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + tx.topologyVersion(topFut.topologyVersion()); + + prepare0(); + } + else { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + prepareOnTopology(); + } + }); + } + }); + } + } + finally { + topologyReadUnlock(); + } + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + private GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + private void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * Initializes future. + */ + private void prepare0() { + try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); + + prepare( + tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), + tx.writeEntries()); + + markInitialized(); + } + catch (TransactionTimeoutException | TransactionOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param reads Read entries. + * @param writes Write entries. + * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node. + */ + private void prepare( + Iterable<IgniteTxEntry> reads, + Iterable<IgniteTxEntry> writes + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>(); + + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + + // Assign keys to primary nodes. + GridDistributedTxMapping cur = null; + + for (IgniteTxEntry read : reads) { + GridDistributedTxMapping updated = map(read, topVer, cur, false); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (read.context().isNear()) + tx.nearLocallyMapped(true); + else if (read.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + for (IgniteTxEntry write : writes) { + GridDistributedTxMapping updated = map(write, topVer, cur, true); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (write.context().isNear()) + tx.nearLocallyMapped(true); + else if (write.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addEntryMapping(mappings); + + cctx.mvcc().recheckPendingLocks(); + + txMapping.initLast(mappings); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + proceedPrepare(mappings); + } + + /** + * Continues prepare after previous mapping successfully finished. + * + * @param mappings Queue of mappings. + */ + private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) { + if (isDone()) + return; + + final GridDistributedTxMapping m = mappings.poll(); + + if (m == null) + return; + + assert !m.empty(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + tx.optimistic() && tx.serializable() ? m.reads() : null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(null, null, e); + } + } + + final MiniFuture fut = new MiniFuture(m, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(n.id(), prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + fut.onResult(e); + } + } + } + + /** + * @param entry Transaction entry. + * @param topVer Topology version. + * @param cur Current mapping. + * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. + * @return Mapping. + */ + private GridDistributedTxMapping map( + IgniteTxEntry entry, + AffinityTopologyVersion topVer, + GridDistributedTxMapping cur, + boolean waitLock + ) throws IgniteCheckedException { + GridCacheContext cacheCtx = entry.context(); + + List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + + txMapping.addMapping(nodes); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + if (log.isDebugEnabled()) { + log.debug("Mapped key to primary node [key=" + entry.key() + + ", part=" + cacheCtx.affinity().partition(entry.key()) + + ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); + } + + // Must re-initialize cached entry while holding topology lock. + if (cacheCtx.isNear()) + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); + else if (!cacheCtx.isLocal()) + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); + else + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); + + if (cacheCtx.isNear() || cacheCtx.isLocal()) { + if (waitLock && entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); + } + + if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + cur = new GridDistributedTxMapping(primary); + + // Initialize near flag right away. + cur.near(cacheCtx.isNear()); + } + + cur.add(entry); + + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + + entry.nodeId(primary.id()); + + if (cacheCtx.isNear()) { + while (true) { + try { + GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); + + cached.dhtNodeId(tx.xidVersion(), primary.id()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry.cached(cacheCtx.near().entryEx(entry.key())); + } + } + } + + return cur; + } + + /** {@inheritDoc} */ + @Override public String toString() { + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }); + + return S.toString(GridNearOptimisticTxPrepareFuture.class, this, + "futs", futs, + "super", super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Flag to signal some result being processed. */ + private AtomicBoolean rcvRes = new AtomicBoolean(false); + + /** Mappings to proceed prepare. */ + private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings; + + /** + * @param m Mapping. + * @param mappings Queue of mappings to proceed with. + */ + MiniFuture( + GridDistributedTxMapping m, + ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings + ) { + this.m = m; + this.mappings = mappings; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @return Keys. + */ + public GridDistributedTxMapping mapping() { + return m; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + else + U.warn(log, "Received error after another result has been processed [fut=" + + GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyCheckedException e) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); + + // Fail the whole future (make sure not to remap on different primary node + // to prevent multiple lock coordinators). + onError(null, null, e); + } + } + + /** + * @param nodeId Failed node ID. + * @param res Result callback. + */ + void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (res.error() != null) { + // Fail the whole compound future. + onError(nodeId, mappings, res.error()); + } + else { + onPrepareResponse(m, res); + + // Proceed prepare before finishing mini future. + if (mappings != null) + proceedPrepare(mappings); + + // Finish this mini future. + onDone(tx); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +}