http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 3a685cc..c5ef22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; @@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; -import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; /** * Distributed cache implementation. @@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; + boolean retry; + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + do { + retry = false; + topVer = ctx.affinity().affinityTopologyVersion(); // Send job to all data nodes. Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, - true).get(); + retry = !ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null).get(); } } - while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - removeAllAsync(opFut, topVer); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + + removeAllAsync(opFut, topVer, skipStore); return opFut; } @@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** * @param opFut Future. * @param topVer Topology version. + * @param skipStore Skip store flag. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) { + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer, + final boolean skipStore) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true); + IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null); - rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { + rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { try { - fut.get(); + boolean retry = !fut.get(); AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion(); - if (topVer0.equals(topVer)) + if (topVer0.equals(topVer) && !retry) opFut.onDone(); else - removeAllAsync(opFut, topVer0); + removeAllAsync(opFut, topVer0, skipStore); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** - * Internal callable which performs remove all primary key mappings - * operation on a cache with the given name. + * Remove task. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> { /** */ private static final long serialVersionUID = 0L; /** Cache name. */ - private String cacheName; + private final String cacheName; - /** Topology version. */ - private AffinityTopologyVersion topVer; + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; /** Skip store flag. */ - private boolean skipStore; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; + private final boolean skipStore; /** - * Empty constructor for serialization. + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param skipStore Skip store flag. */ - public GlobalRemoveAllCallable() { - // No-op. + public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) { + this.cacheName = cacheName; + this.topVer = topVer; + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) + jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + IgniteException e = res.getException(); + + if (e != null) { + if (e instanceof ClusterTopologyException) + return ComputeJobResultPolicy.WAIT; + + throw new IgniteException("Remote job threw exception.", e); + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException { + for (ComputeJobResult locRes : results) { + if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData())) + return false; + } + + return true; } + } + /** + * Internal job which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Skip store flag. */ + private final boolean skipStore; /** * @param cacheName Cache name. * @param topVer Topology version. * @param skipStore Skip store flag. */ - private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - this.cacheName = cacheName; - this.topVer = topVer; + private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { + super(cacheName, topVer); + this.skipStore = skipStore; } - /** - * {@inheritDoc} - */ - @Override public Object call() throws Exception { - GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) { + GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName); - final GridCacheContext<K, V> ctx = cacheAdapter.context(); + if (cache == null) + return true; - ctx.affinity().affinityReadyFuture(topVer).get(); + final GridCacheContext<K, V> ctx = cache.context(); ctx.gate().enter(); try { if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) - return null; // Ignore this remove request because remove request will be sent again. + return false; // Ignore this remove request because remove request will be sent again. GridDhtCacheAdapter<K, V> dht; GridNearCacheAdapter<K, V> near = null; - if (cacheAdapter instanceof GridNearCacheAdapter) { - near = ((GridNearCacheAdapter<K, V>)cacheAdapter); + if (cache instanceof GridNearCacheAdapter) { + near = ((GridNearCacheAdapter<K, V>) cache); dht = near.dht(); } else - dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; + dht = (GridDhtCacheAdapter<K, V>) cache; try (DataStreamerImpl<KeyCacheObject, Object> dataLdr = - (DataStreamerImpl)ignite.dataStreamer(cacheName)) { - ((DataStreamerImpl)dataLdr).maxRemapCount(0); + (DataStreamerImpl) ignite.dataStreamer(cacheName)) { + ((DataStreamerImpl) dataLdr).maxRemapCount(0); dataLdr.skipStore(skipStore); dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); - for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { - if (!locPart.isEmpty() && locPart.primary(topVer)) { - for (GridDhtCacheEntry o : locPart.entries()) { - if (!o.obsoleteOrDeleted()) - dataLdr.removeDataInternal(o.key()); - } - } - } + for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) { + GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false); - Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer); + if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve()) + return false; - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + try { + if (!locPart.isEmpty()) { + for (GridDhtCacheEntry o : locPart.entries()) { + if (!o.obsoleteOrDeleted()) + dataLdr.removeDataInternal(o.key()); + } + } - it = dht.context().swap().swapKeyIterator(true, false, topVer); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + dht.context().swap().iterator(part); - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + if (iter != null) { + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) + dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey())); + } + } + finally { + locPart.release(); + } + } } if (near != null) { @@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } } } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { ctx.gate().leave(); } - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topVer); - out.writeBoolean(skipStore); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = (AffinityTopologyVersion)in.readObject(); - skipStore = in.readBoolean(); + return true; } } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index fd1040f..c5ac847 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -83,12 +82,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** Key count. */ private int txSize; - /** Group lock key if this is a group-lock transaction. */ - private IgniteTxKey grpLockKey; - - /** Partition lock flag. Only if group-lock transaction. */ - private boolean partLock; - /** * Additional flags. * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value. @@ -116,9 +109,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { * @param timeout Lock timeout. * @param keyCnt Number of keys. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock {@code True} if this is a group-lock transaction request and whole partition is - * locked. * @param skipStore Skip store flag. */ public GridDistributedLockRequest( @@ -135,8 +125,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { long timeout, int keyCnt, int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, boolean skipStore ) { super(lockVer, keyCnt); @@ -156,8 +144,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { this.isInvalidate = isInvalidate; this.timeout = timeout; this.txSize = txSize; - this.grpLockKey = grpLockKey; - this.partLock = partLock; retVals = new boolean[keyCnt]; @@ -295,27 +281,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { } /** - * @return {@code True} if lock request for group-lock transaction. - */ - public boolean groupLock() { - return grpLockKey != null; - } - - /** - * @return Group lock key. - */ - @Nullable public IgniteTxKey groupLockKey() { - return grpLockKey; - } - - /** - * @return {@code True} if partition is locked in group-lock transaction. - */ - public boolean partitionLock() { - return partLock; - } - - /** * @return Max lock wait time. */ public long timeout() { @@ -330,9 +295,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { GridCacheContext cctx = ctx.cacheContext(cacheId); prepareMarshalCacheObjects(keys, cctx); - - if (grpLockKey != null) - grpLockKey.prepareMarshal(cctx); } /** {@inheritDoc} */ @@ -342,9 +304,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { GridCacheContext cctx = ctx.cacheContext(cacheId); finishUnmarshalCacheObjects(keys, cctx, ldr); - - if (grpLockKey != null) - grpLockKey.finishUnmarshal(cctx, ldr); } /** {@inheritDoc} */ @@ -375,78 +334,66 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { writer.incrementState(); case 10: - if (!writer.writeMessage("grpLockKey", grpLockKey)) - return false; - - writer.incrementState(); - - case 11: if (!writer.writeBoolean("isInTx", isInTx)) return false; writer.incrementState(); - case 12: + case 11: if (!writer.writeBoolean("isInvalidate", isInvalidate)) return false; writer.incrementState(); - case 13: + case 12: if (!writer.writeBoolean("isRead", isRead)) return false; writer.incrementState(); - case 14: + case 13: if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); - case 15: + case 14: if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 16: + case 15: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 17: + case 16: if (!writer.writeUuid("nodeId", nodeId)) return false; writer.incrementState(); - case 18: - if (!writer.writeBoolean("partLock", partLock)) - return false; - - writer.incrementState(); - - case 19: + case 17: if (!writer.writeBooleanArray("retVals", retVals)) return false; writer.incrementState(); - case 20: + case 18: if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); - case 21: + case 19: if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); - case 22: + case 20: if (!writer.writeInt("txSize", txSize)) return false; @@ -485,14 +432,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 10: - grpLockKey = reader.readMessage("grpLockKey"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: isInTx = reader.readBoolean("isInTx"); if (!reader.isLastRead()) @@ -500,7 +439,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 12: + case 11: isInvalidate = reader.readBoolean("isInvalidate"); if (!reader.isLastRead()) @@ -508,7 +447,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 13: + case 12: isRead = reader.readBoolean("isRead"); if (!reader.isLastRead()) @@ -516,7 +455,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 14: + case 13: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -528,7 +467,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 15: + case 14: keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -536,7 +475,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 16: + case 15: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -544,7 +483,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 17: + case 16: nodeId = reader.readUuid("nodeId"); if (!reader.isLastRead()) @@ -552,15 +491,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 18: - partLock = reader.readBoolean("partLock"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: + case 17: retVals = reader.readBooleanArray("retVals"); if (!reader.isLastRead()) @@ -568,7 +499,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 20: + case 18: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -576,7 +507,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 21: + case 19: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -584,7 +515,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 22: + case 20: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -604,7 +535,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 9672a75..c524575 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -18,10 +18,8 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.lang.*; @@ -66,9 +64,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { /** Expected txSize. */ private int txSize; - /** Group lock key. */ - private IgniteTxKey grpLockKey; - /** System transaction flag. */ private boolean sys; @@ -95,7 +90,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. */ public GridDistributedTxFinishRequest( GridCacheVersion xidVer, @@ -111,8 +105,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, - int txSize, - @Nullable IgniteTxKey grpLockKey + int txSize ) { super(xidVer, 0); assert xidVer != null; @@ -128,7 +121,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { this.syncRollback = syncRollback; this.baseVer = baseVer; this.txSize = txSize; - this.grpLockKey = grpLockKey; completedVersions(committedVers, rolledbackVers); } @@ -219,35 +211,15 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { return commit ? syncCommit : syncRollback; } - /** - * @return {@code True} if group lock transaction. - */ - public boolean groupLock() { - return grpLockKey != null; - } - - /** - * @return Group lock key. - */ - @Nullable public IgniteTxKey groupLockKey() { - return grpLockKey; - } - /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - - if (grpLockKey != null) - grpLockKey.prepareMarshal(ctx.cacheContext(cacheId)); } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - if (grpLockKey != null) - grpLockKey.finishUnmarshal(ctx.cacheContext(cacheId), ldr); } /** {@inheritDoc} */ @@ -290,48 +262,42 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { writer.incrementState(); case 12: - if (!writer.writeMessage("grpLockKey", grpLockKey)) - return false; - - writer.incrementState(); - - case 13: if (!writer.writeBoolean("invalidate", invalidate)) return false; writer.incrementState(); - case 14: + case 13: if (!writer.writeByte("plc", plc != null ? (byte)plc.ordinal() : -1)) return false; writer.incrementState(); - case 15: + case 14: if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); - case 16: + case 15: if (!writer.writeBoolean("syncRollback", syncRollback)) return false; writer.incrementState(); - case 17: + case 16: if (!writer.writeBoolean("sys", sys)) return false; writer.incrementState(); - case 18: + case 17: if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); - case 19: + case 18: if (!writer.writeInt("txSize", txSize)) return false; @@ -386,14 +352,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); case 12: - grpLockKey = reader.readMessage("grpLockKey"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: invalidate = reader.readBoolean("invalidate"); if (!reader.isLastRead()) @@ -401,7 +359,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 14: + case 13: byte plcOrd; plcOrd = reader.readByte("plc"); @@ -413,7 +371,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 15: + case 14: syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) @@ -421,7 +379,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 16: + case 15: syncRollback = reader.readBoolean("syncRollback"); if (!reader.isLastRead()) @@ -429,7 +387,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 17: + case 16: sys = reader.readBoolean("sys"); if (!reader.isLastRead()) @@ -437,7 +395,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 18: + case 17: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -445,7 +403,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 19: + case 18: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -465,7 +423,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 20; + return 19; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index aba9e86..d11b879 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -88,18 +88,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> dhtVerVals; - /** Group lock key, if any. */ - @GridToStringInclude - @GridDirectTransient - private IgniteTxKey grpLockKey; - - /** Group lock key bytes. */ - @GridToStringExclude - private ByteBuffer grpLockKeyBytes; - - /** Partition lock flag. */ - private boolean partLock; - /** Expected transaction size. */ private int txSize; @@ -130,8 +118,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage * @param tx Cache transaction. * @param reads Read entries. * @param writes Write entries. - * @param grpLockKey Group lock key. - * @param partLock {@code True} if preparing group-lock transaction with partition lock. * @param txNodes Transaction nodes mapping. * @param onePhaseCommit One phase commit flag. */ @@ -139,8 +125,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage IgniteInternalTx tx, @Nullable Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, - IgniteTxKey grpLockKey, - boolean partLock, Map<UUID, Collection<UUID>> txNodes, boolean onePhaseCommit ) { @@ -158,8 +142,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage this.reads = reads; this.writes = writes; - this.grpLockKey = grpLockKey; - this.partLock = partLock; this.txNodes = txNodes; this.onePhaseCommit = onePhaseCommit; } @@ -272,20 +254,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } /** - * @return Group lock key if preparing group-lock transaction. - */ - @Nullable public IgniteTxKey groupLockKey() { - return grpLockKey; - } - - /** - * @return {@code True} if preparing group-lock transaction with partition lock. - */ - public boolean partitionLock() { - return partLock; - } - - /** * @return Expected transaction size. */ public int txSize() { @@ -310,9 +278,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (reads != null) marshalTx(reads, ctx); - if (grpLockKey != null && grpLockKeyBytes == null) - grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey); - if (dhtVers != null) { for (IgniteTxKey key : dhtVers.keySet()) { GridCacheContext cctx = ctx.cacheContext(key.cacheId()); @@ -338,9 +303,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (reads != null) unmarshalTx(reads, false, ctx, ldr); - if (grpLockKeyBytes != null && grpLockKey == null) - grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); - if (dhtVerKeys != null && dhtVers == null) { assert dhtVerVals != null; assert dhtVerKeys.size() == dhtVerVals.size(); @@ -397,84 +359,72 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 11: - if (!writer.writeByteBuffer("grpLockKeyBytes", grpLockKeyBytes)) - return false; - - writer.incrementState(); - - case 12: if (!writer.writeBoolean("invalidate", invalidate)) return false; writer.incrementState(); - case 13: + case 12: if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); - case 14: + case 13: if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); - case 15: - if (!writer.writeBoolean("partLock", partLock)) - return false; - - writer.incrementState(); - - case 16: + case 14: if (!writer.writeByte("plc", plc != null ? (byte)plc.ordinal() : -1)) return false; writer.incrementState(); - case 17: + case 15: if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 18: + case 16: if (!writer.writeBoolean("sys", sys)) return false; writer.incrementState(); - case 19: + case 17: if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); - case 20: + case 18: if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); - case 21: + case 19: if (!writer.writeByteBuffer("txNodesBytes", txNodesBytes)) return false; writer.incrementState(); - case 22: + case 20: if (!writer.writeInt("txSize", txSize)) return false; writer.incrementState(); - case 23: + case 21: if (!writer.writeMessage("writeVer", writeVer)) return false; writer.incrementState(); - case 24: + case 22: if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG)) return false; @@ -525,14 +475,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 11: - grpLockKeyBytes = reader.readByteBuffer("grpLockKeyBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: invalidate = reader.readBoolean("invalidate"); if (!reader.isLastRead()) @@ -540,7 +482,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 13: + case 12: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -552,7 +494,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 14: + case 13: onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) @@ -560,15 +502,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 15: - partLock = reader.readBoolean("partLock"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 16: + case 14: byte plcOrd; plcOrd = reader.readByte("plc"); @@ -580,7 +514,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 17: + case 15: reads = reader.readCollection("reads", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -588,7 +522,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 18: + case 16: sys = reader.readBoolean("sys"); if (!reader.isLastRead()) @@ -596,7 +530,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 19: + case 17: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -604,7 +538,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 20: + case 18: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -612,7 +546,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 21: + case 19: txNodesBytes = reader.readByteBuffer("txNodesBytes"); if (!reader.isLastRead()) @@ -620,7 +554,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 22: + case 20: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -628,7 +562,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 23: + case 21: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -636,7 +570,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 24: + case 22: writes = reader.readCollection("writes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -656,7 +590,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 23; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3215138..8594853 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -95,7 +95,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param invalidate Invalidate flag. * @param timeout Timeout. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. * @param subjId Subject ID. * @param taskNameHash Task name hash code. */ @@ -112,7 +111,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter boolean invalidate, long timeout, int txSize, - @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { @@ -128,7 +126,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter isolation, timeout, txSize, - grpLockKey, subjId, taskNameHash); @@ -195,16 +192,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // No-op. } - /** - * Adds group lock key to remote transaction. - * - * @param key Key. - */ - public void groupLockKey(IgniteTxKey key) { - if (grpLockKey == null) - grpLockKey = key; - } - /** {@inheritDoc} */ @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx, boolean failFast, @@ -350,7 +337,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter entry.op(e.op()); entry.ttl(e.ttl()); entry.explicitVersion(e.explicitVersion()); - entry.groupLockEntry(e.groupLockEntry()); // Conflict resolution stuff. entry.conflictVersion(e.conflictVersion()); @@ -446,7 +432,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; // If locks haven't been acquired yet, keep waiting. - if (!txEntry.groupLockEntry() && !Entry.lockedBy(ver)) { + if (!Entry.lockedBy(ver)) { if (log.isDebugEnabled()) log.debug("Transaction does not own lock for entry (will wait) [entry=" + Entry + ", tx=" + this + ']'); @@ -607,10 +593,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } // No-op. else { - assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()): - "Transaction does not own lock for group lock entry during commit [tx=" + - this + ", txEntry=" + txEntry + ']'; - if (conflictCtx == null || !conflictCtx.isUseOld()) { if (txEntry.ttl() != CU.TTL_NOT_CHANGED) cached.updateTtl(null, txEntry.ttl()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 1c46fd0..23060e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -102,6 +102,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheMapEntry next, int hdrId) { + if (ctx.useOffheapEntry()) + return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId); + return new GridDhtCacheEntry(ctx, topVer, key, hash, val, next, hdrId); } }); @@ -343,17 +346,24 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap public GridCacheEntryEx entryExx(KeyCacheObject key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { try { return allowDetached && !ctx.affinity().localNode(key, topVer) ? - new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0) : - entryEx(key, touch); + createEntry(key) : entryEx(key, touch); } catch (GridDhtInvalidPartitionException e) { if (!allowDetached) throw e; - return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0); + return createEntry(key); } } + /** + * @param key Key for which entry should be returned. + * @return Cache entry. + */ + protected GridDistributedCacheEntry createEntry(KeyCacheObject key) { + return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0); + } + /** {@inheritDoc} */ @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 5b0275c..c57eded 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -833,8 +833,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo cnt, 0, inTx() ? tx.size() : cnt, - inTx() ? tx.groupLockKey() : null, - inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index b8e57a4..e08344f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -22,7 +22,6 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; @@ -101,8 +100,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { * @param dhtCnt DHT count. * @param nearCnt Near count. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key. - * @param partLock {@code True} if partition lock. * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. @@ -125,8 +122,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { int dhtCnt, int nearCnt, int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, @Nullable UUID subjId, int taskNameHash, long accessTtl, @@ -145,8 +140,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { timeout, dhtCnt == 0 ? nearCnt : dhtCnt, txSize, - grpLockKey, - partLock, skipStore); this.topVer = topVer; @@ -331,55 +324,55 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { } switch (writer.state()) { - case 23: + case 21: if (!writer.writeLong("accessTtl", accessTtl)) return false; writer.incrementState(); - case 24: + case 22: if (!writer.writeBitSet("invalidateEntries", invalidateEntries)) return false; writer.incrementState(); - case 25: + case 23: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 26: + case 24: if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 27: + case 25: if (!writer.writeByteBuffer("ownedBytes", ownedBytes)) return false; writer.incrementState(); - case 28: + case 26: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 29: + case 27: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 30: + case 28: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 31: + case 29: if (!writer.writeMessage("topVer", topVer)) return false; @@ -401,7 +394,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { return false; switch (reader.state()) { - case 23: + case 21: accessTtl = reader.readLong("accessTtl"); if (!reader.isLastRead()) @@ -409,7 +402,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 24: + case 22: invalidateEntries = reader.readBitSet("invalidateEntries"); if (!reader.isLastRead()) @@ -417,7 +410,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 25: + case 23: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -425,7 +418,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 26: + case 24: nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -433,7 +426,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 27: + case 25: ownedBytes = reader.readByteBuffer("ownedBytes"); if (!reader.isLastRead()) @@ -441,7 +434,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 28: + case 26: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -449,7 +442,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 29: + case 27: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -457,7 +450,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 30: + case 28: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -465,7 +458,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 31: + case 29: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -485,7 +478,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 32; + return 30; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtOffHeapCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtOffHeapCacheEntry.java new file mode 100644 index 0000000..1191d83 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtOffHeapCacheEntry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; + +/** + * Replicated cache entry for off-heap tiered or off-heap values modes. + */ +public class GridDhtOffHeapCacheEntry extends GridDhtCacheEntry { + /** Off-heap value pointer. */ + private long valPtr; + + /** + * @param ctx Cache context. + * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). + * @param key Cache key. + * @param hash Key hash value. + * @param val Entry value. + * @param next Next entry in the linked list. + * @param hdrId Header id. + */ + public GridDhtOffHeapCacheEntry(GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + int hdrId) { + super(ctx, topVer, key, hash, val, next, hdrId); + } + + /** {@inheritDoc} */ + @Override protected boolean hasOffHeapPointer() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override protected long offHeapPointer() { + return valPtr; + } + + /** {@inheritDoc} */ + @Override protected void offHeapPointer(long valPtr) { + this.valPtr = valPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 87026f3..26eef50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -174,7 +174,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Invalidate key in near cache, if any. if (isNearEnabled(cacheCfg)) - obsoleteNearEntry(key, req.version()); + obsoleteNearEntry(key); break; } @@ -203,7 +203,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.isInvalidate(), req.timeout(), req.txSize(), - req.groupLockKey(), req.subjectId(), req.taskNameHash()); @@ -222,9 +221,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach null, req.accessTtl(), req.skipStore()); - - if (req.groupLock()) - tx.groupLockKey(txKey); } entry = entryExx(key, req.topologyVersion()); @@ -291,7 +287,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Invalidate key in near cache, if any. if (isNearEnabled(cacheCfg)) - obsoleteNearEntry(key, req.version()); + obsoleteNearEntry(key); if (tx != null) { tx.clearEntry(txKey); @@ -810,8 +806,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.isInvalidate(), false, req.txSize(), - req.groupLockKey(), - req.partitionLock(), null, req.subjectId(), req.taskNameHash()); @@ -1481,12 +1475,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach /** * @param key Key - * @param ver Version. */ - private void obsoleteNearEntry(KeyCacheObject key, GridCacheVersion ver) { + private void obsoleteNearEntry(KeyCacheObject key) { GridCacheEntryEx nearEntry = near().peekEx(key); if (nearEntry != null) - nearEntry.markObsolete(ver); + nearEntry.markObsolete(ctx.versions().next()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 7c35fc5..7fd79e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -309,7 +309,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.rolledbackVersions(), tx.pendingVersions(), tx.size(), - tx.groupLockKey(), tx.subjectId(), tx.taskNameHash()); @@ -387,7 +386,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.rolledbackVersions(), tx.pendingVersions(), tx.size(), - tx.groupLockKey(), tx.subjectId(), tx.taskNameHash()); @@ -439,7 +437,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.rolledbackVersions(), tx.pendingVersions(), tx.size(), - tx.groupLockKey(), tx.subjectId(), tx.taskNameHash()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index d20a7c3..7b077c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -97,7 +96,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param rolledbackVers Rolled back versions. * @param pendingVers Pending versions. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key. * @param subjId Subject ID. * @param taskNameHash Task name hash. */ @@ -122,12 +120,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers, int txSize, - @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer, - committedVers, rolledbackVers, txSize, grpLockKey); + committedVers, rolledbackVers, txSize); assert miniId != null; assert nearNodeId != null; @@ -241,55 +238,55 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } switch (writer.state()) { - case 20: + case 19: if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); - case 21: + case 20: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 22: + case 21: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 23: + case 22: if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 24: + case 23: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 25: + case 24: if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) return false; writer.incrementState(); - case 26: + case 25: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 27: + case 26: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 28: + case 27: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -311,7 +308,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { return false; switch (reader.state()) { - case 20: + case 19: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -323,7 +320,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 21: + case 20: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -331,7 +328,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 22: + case 21: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -339,7 +336,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 23: + case 22: pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -347,7 +344,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 24: + case 23: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -355,7 +352,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 25: + case 24: sysInvalidate = reader.readBoolean("sysInvalidate"); if (!reader.isLastRead()) @@ -363,7 +360,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 26: + case 25: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -371,7 +368,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 27: + case 26: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -379,7 +376,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 28: + case 27: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -399,6 +396,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 29; + return 28; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 07ced0d..841cac8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -93,8 +93,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param timeout Timeout. * @param storeEnabled Store enabled flag. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock {@code True} if this is a group-lock transaction and whole partition should be locked. * @param txNodes Transaction nodes mapping. */ public GridDhtTxLocal( @@ -115,8 +113,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa boolean invalidate, boolean storeEnabled, int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, Map<UUID, Collection<UUID>> txNodes, UUID subjId, int taskNameHash @@ -135,8 +131,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa invalidate, storeEnabled, txSize, - grpLockKey, - partLock, subjId, taskNameHash); @@ -284,7 +278,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() { + @Override public IgniteInternalFuture<?> prepareAsync() { if (optimistic()) { assert isSystemInvalidate(); @@ -296,7 +290,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa nearMiniId, null, true, - null, null); } @@ -305,14 +298,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (fut == null) { // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( + if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, needReturnValue(), - null, null))) return prepFut.get(); } @@ -371,7 +363,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteInternalFuture<IgniteInternalTx> prepareAsync( + public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( @Nullable Iterable<IgniteTxEntry> reads, @Nullable Iterable<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, @@ -379,8 +371,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups, - IgniteInClosure<GridNearTxPrepareResponse> completeCb + Collection<UUID> lastBackups ) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut.get(); @@ -389,21 +380,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa init(); // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>( + if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, verMap, last, needReturnValue(), - lastBackups, - completeCb))) { + lastBackups))) { GridDhtTxPrepareFuture f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; - return f; + return chainOnePhasePrepare(f); } } else { @@ -411,7 +401,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; // Prepare was called explicitly. - return fut; + return chainOnePhasePrepare(fut); } if (state() != PREPARING) { @@ -475,7 +465,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } - return fut; + return chainOnePhasePrepare(fut); } /** {@inheritDoc} */ @@ -517,8 +507,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else - prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prep.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); // Check for errors of a parent future. @@ -605,8 +595,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa else { prepFut.complete(); - prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prepFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); // Check for errors of a parent future. } @@ -686,7 +676,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() { return prepFut.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 08fcaf6..54b59b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -93,8 +94,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param isolation Isolation. * @param timeout Timeout. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock If this is a group-lock transaction and the whole partition should be locked. */ protected GridDhtTxLocalAdapter( GridCacheSharedContext cctx, @@ -110,13 +109,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { boolean invalidate, boolean storeEnabled, int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, @Nullable UUID subjId, int taskNameHash ) { super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate, - storeEnabled, txSize, grpLockKey, partLock, subjId, taskNameHash); + storeEnabled, txSize, subjId, taskNameHash); assert cctx != null; @@ -732,68 +729,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** {@inheritDoc} */ - @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) { - assert groupLock(); - - for (GridDistributedTxMapping mapping : dhtMap.values()) - mapping.entries(Collections.unmodifiableCollection(txMap.values()), true); - - // Here we know that affinity key for all given keys is our group lock key. - // Just add entries to dht mapping. - // Add near readers. If near cache is disabled on all nodes, do nothing. - Collection<UUID> backupIds = dhtMap.keySet(); - - Map<ClusterNode, List<GridDhtCacheEntry>> locNearMap = null; - - for (IgniteTxKey key : keys) { - IgniteTxEntry txEntry = entry(key); - - if (!txEntry.groupLockEntry() || txEntry.context().isNear()) - continue; - - assert txEntry.cached() instanceof GridDhtCacheEntry : "Invalid entry type: " + txEntry.cached(); - - while (true) { - try { - GridDhtCacheEntry entry = (GridDhtCacheEntry)txEntry.cached(); - - Collection<UUID> readers = entry.readers(); - - if (!F.isEmpty(readers)) { - Collection<ClusterNode> nearNodes = cctx.discovery().nodes(readers, F0.notEqualTo(nearNodeId()), - F.notIn(backupIds)); - - if (log.isDebugEnabled()) - log.debug("Mapping entry to near nodes [nodes=" + U.nodeIds(nearNodes) + ", entry=" + - entry + ']'); - - for (ClusterNode n : nearNodes) { - if (locNearMap == null) - locNearMap = new HashMap<>(); - - List<GridDhtCacheEntry> entries = locNearMap.get(n); - - if (entries == null) - locNearMap.put(n, entries = new LinkedList<>()); - - entries.add(entry); - } - } - - break; - } - catch (GridCacheEntryRemovedException ignored) { - // Retry. - txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion())); - } - } - } - - if (locNearMap != null) - addNearNodeEntryMapping(locNearMap); - } - - /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) @Override public boolean finish(boolean commit) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -885,6 +820,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { */ protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut); + /** + * @return {@code True} if transaction is finished on prepare step. + */ + protected final boolean commitOnPrepare() { + return onePhaseCommit() && !near(); + } + + /** + * @param prepFut Prepare future. + * @return If transaction if finished on prepare step returns future which is completed after transaction finish. + */ + protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare( + final GridDhtTxPrepareFuture prepFut) { + if (commitOnPrepare()) { + return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() { + @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut) + throws IgniteCheckedException + { + return prepFut.get(); + } + }); + } + + return prepFut; + } + /** {@inheritDoc} */ @Override public void rollback() throws IgniteCheckedException { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index d207d76..ba2c35f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@ -28,7 +28,7 @@ import java.util.*; /** * DHT transaction mapping. */ -public class GridDhtTxMapping<K, V> { +public class GridDhtTxMapping { /** Transaction nodes mapping (primary node -> related backup nodes). */ private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>();