http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java new file mode 100644 index 0000000..bce62c1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; +import static org.apache.ignite.transactions.TransactionState.*; + +/** + * + */ +public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter { + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.pessimistic() : tx; + + // Should wait for all mini futures completion before finishing tx. + ignoreChildFailures(IgniteCheckedException.class); + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + return ((MiniFuture)f).node(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture<?> fut : futures()) { + MiniFuture f = (MiniFuture)fut; + + if (f.node().id().equals(nodeId)) { + f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + + found = true; + } + } + + return found; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); + + f.onResult(res); + } + } + } + } + /** {@inheritDoc} */ + @Override public void prepare() { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); + else + onDone(new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onDone(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + try { + tx.userPrepare(); + + cctx.mvcc().addFuture(this); + + preparePessimistic(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * + */ + private void preparePessimistic() { + Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); + + AffinityTopologyVersion topVer = tx.topologyVersion(); + + txMapping = new GridDhtTxMapping(); + + for (IgniteTxEntry txEntry : tx.allEntries()) { + GridCacheContext cacheCtx = txEntry.context(); + + List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer); + + ClusterNode primary = F.first(nodes); + + boolean near = cacheCtx.isNear(); + + IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near); + + GridDistributedTxMapping nodeMapping = mappings.get(key); + + if (nodeMapping == null) { + nodeMapping = new GridDistributedTxMapping(primary); + + nodeMapping.near(cacheCtx.isNear()); + + mappings.put(key, nodeMapping); + } + + txEntry.nodeId(primary.id()); + + nodeMapping.add(txEntry); + + txMapping.addMapping(nodes); + } + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + for (final GridDistributedTxMapping m : mappings.values()) { + final ClusterNode node = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + m.reads(), + m.writes(), + m.near(), + txMapping.transactionNodes(), + true, + txMapping.transactionNodes().get(node.id()), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + final MiniFuture fut = new MiniFuture(m); + + req.miniId(fut.futureId()); + + add(fut); + + if (node.isLocal()) { + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(), + tx, + req); + + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + } + }); + } + else { + try { + cctx.io().send(node, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + fut.onNodeLeft(e); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + } + } + + markInitialized(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) { + if (err != null) + this.err.compareAndSet(null, err); + + err = this.err.get(); + + if (err == null) + tx.state(PREPARED); + + if (super.onDone(tx, err)) { + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }); + + return S.toString(GridNearPessimisticTxPrepareFuture.class, this, + "futs", futs, + "super", super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** */ + private GridDistributedTxMapping m; + + /** + * @param m Mapping. + */ + MiniFuture(GridDistributedTxMapping m) { + this.m = m; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @param res Response. + */ + void onResult(GridNearTxPrepareResponse res) { + if (res.error() != null) + onError(res.error()); + else { + onPrepareResponse(m, res); + + onDone(tx); + } + } + + /** + * @param e Error. + */ + void onNodeLeft(ClusterTopologyCheckedException e) { + onError(e); + } + + /** + * @param e Error. + */ + void onError(Throwable e) { + if (isDone()) { + U.warn(log, "Received error when future is done [fut=" + this + ", err=" + e + ", tx=" + tx + ']'); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Error on tx prepare [fut=" + this + ", err=" + e + ", tx=" + tx + ']'); + + if (err.compareAndSet(null, e)) + tx.setRollbackOnly(); + + onDone(e); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 581c7e0..df7a65f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -301,14 +301,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> req.isInvalidate(), req.timeout(), req.txSize(), - req.groupLockKey(), req.subjectId(), req.taskNameHash() ); - if (req.groupLock()) - tx.groupLockKey(txKey); - tx = ctx.tm().onCreated(null, tx); if (tx == null || !ctx.tm().onStarted(tx)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 7b0b811..b44f821 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -97,7 +97,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { @Nullable UUID subjId, int taskNameHash) { super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer, - committedVers, rolledbackVers, txSize, null); + committedVers, rolledbackVers, txSize); this.explicitLock = explicitLock; this.storeEnabled = storeEnabled; @@ -170,37 +170,37 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { } switch (writer.state()) { - case 20: + case 19: if (!writer.writeBoolean("explicitLock", explicitLock)) return false; writer.incrementState(); - case 21: + case 20: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 22: + case 21: if (!writer.writeBoolean("storeEnabled", storeEnabled)) return false; writer.incrementState(); - case 23: + case 22: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 24: + case 23: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 25: + case 24: if (!writer.writeMessage("topVer", topVer)) return false; @@ -222,7 +222,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { return false; switch (reader.state()) { - case 20: + case 19: explicitLock = reader.readBoolean("explicitLock"); if (!reader.isLastRead()) @@ -230,7 +230,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 21: + case 20: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -238,7 +238,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 22: + case 21: storeEnabled = reader.readBoolean("storeEnabled"); if (!reader.isLastRead()) @@ -246,7 +246,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 23: + case 22: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -254,7 +254,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 24: + case 23: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -262,7 +262,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 25: + case 24: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -282,7 +282,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c665354..5c426ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -61,8 +61,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** Future. */ @GridToStringExclude - private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut = - new AtomicReference<>(); + private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>(); /** */ @GridToStringExclude @@ -103,8 +102,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @param timeout Timeout. * @param storeEnabled Store enabled flag. * @param txSize Transaction size. - * @param grpLockKey Group lock key if this is a group lock transaction. - * @param partLock {@code True} if this is a group-lock transaction and the whole partition should be locked. * @param subjId Subject ID. * @param taskNameHash Task name hash code. */ @@ -119,8 +116,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { long timeout, boolean storeEnabled, int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, @Nullable UUID subjId, int taskNameHash ) { @@ -138,8 +133,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { false, storeEnabled, txSize, - grpLockKey, - partLock, subjId, taskNameHash); @@ -273,9 +266,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> optimisticLockEntries() { - if (groupLock()) - return super.optimisticLockEntries(); - return optimisticLockEntries; } @@ -417,13 +407,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } - /** {@inheritDoc} */ - @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) { - super.addGroupTxMapping(keys); - - addKeyMapping(cctx.localNode(), keys); - } - /** * Adds key mapping to dht mapping. * @@ -563,9 +546,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = groupLock() ? - Collections.singletonList(groupLockEntry()) : - F.concat(false, mapping.reads(), mapping.writes()); + Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes()); for (IgniteTxEntry txEntry : entries) { while (true) { @@ -682,12 +663,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() { - GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)prepFut.get(); + @Override public IgniteInternalFuture<?> prepareAsync() { + GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. - fut = new GridNearTxPrepareFuture<>(cctx, this); + fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) : + new GridNearPessimisticTxPrepareFuture(cctx, this); if (!prepFut.compareAndSet(null, fut)) return prepFut.get(); @@ -698,41 +680,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { mapExplicitLocks(); - // For pessimistic mode we don't distribute prepare request and do not lock topology version - // as it was fixed on first lock. - if (pessimistic()) { - if (!state(PREPARING)) { - if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " + - "rolled back: " + this)); - else - fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + - state() + ", tx=" + this + ']')); - } - else - fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + - "[state=" + state() + ", tx=" + this + ']')); - - return fut; - } - - try { - userPrepare(); - - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(fut); - - fut.prepare(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - } - else { - // In optimistic mode we must wait for topology map update. - fut.prepare(); - } + fut.prepare(); return fut; } @@ -752,10 +700,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut); - IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get(); + IgniteInternalFuture<?> prepareFut = prepFut.get(); - prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prepareFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { GridNearTxFinishFuture fut0 = commitFut.get(); try { @@ -799,7 +747,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut); - IgniteInternalFuture<IgniteInternalTx> prepFut = this.prepFut.get(); + IgniteInternalFuture<?> prepFut = this.prepFut.get(); if (prepFut == null || prepFut.isDone()) { try { @@ -823,8 +771,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } else { - prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prepFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { try { // Check for errors in prepare future. f.get(); @@ -867,12 +815,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") - public IgniteInternalFuture<IgniteInternalTx> prepareAsyncLocal( + public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal( @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups, - IgniteInClosure<GridNearTxPrepareResponse> completeCb + Collection<UUID> lastBackups ) { if (state() != PREPARING) { if (timedOut()) @@ -887,15 +834,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { init(); - GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture<>( + GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture( cctx, this, IgniteUuid.randomUuid(), Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), last, needReturnValue() && implicit(), - lastBackups, - completeCb); + lastBackups); try { // At this point all the entries passed in must be enlisted in transaction because this is an @@ -934,7 +880,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } - return fut; + return chainOnePhasePrepare(fut); } /** @@ -950,7 +896,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (pessimistic()) prepareAsync(); - IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get(); + IgniteInternalFuture<?> prep = prepFut.get(); // Do not create finish future if there are no remote nodes. if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) { @@ -986,8 +932,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } else - prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prep.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); // Check for errors of a parent future. @@ -1023,7 +969,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut); - IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get(); + IgniteInternalFuture<?> prep = prepFut.get(); if (prep == null || prep.isDone()) { try { @@ -1039,8 +985,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { fut.finish(); } else - prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + prep.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { try { f.get(); // Check for errors of a parent future. } @@ -1233,7 +1179,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() { return prepFut.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java deleted file mode 100644 index f573187..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ /dev/null @@ -1,1050 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.processors.affinity.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.transactions.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import javax.cache.expiry.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; -import static org.apache.ignite.transactions.TransactionState.*; - -/** - * - */ -public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx> - implements GridCacheMvccFuture<IgniteInternalTx> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - private static IgniteLogger log; - - /** Context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Future ID. */ - private IgniteUuid futId; - - /** Transaction. */ - @GridToStringInclude - private GridNearTxLocal tx; - - /** Error. */ - @GridToStringExclude - private AtomicReference<Throwable> err = new AtomicReference<>(null); - - /** Trackable flag. */ - private boolean trackable = true; - - /** Full information about transaction nodes mapping. */ - private GridDhtTxMapping<K, V> txMapping; - - /** */ - private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); - - /** - * @param cctx Context. - * @param tx Transaction. - */ - public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNearTxLocal tx) { - super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { - @Override public boolean collect(IgniteInternalTx e) { - return true; - } - - @Override public IgniteInternalTx reduce() { - // Nothing to aggregate. - return tx; - } - }); - - assert cctx != null; - assert tx != null; - - this.cctx = cctx; - this.tx = tx; - - futId = IgniteUuid.randomUuid(); - - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFuture.class); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - if (log.isDebugEnabled()) - log.debug("Transaction future received owner changed callback: " + entry); - - if (tx.optimistic()) { - if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { - lockKeys.remove(entry.txKey()); - - // This will check for locks. - onDone(); - - return true; - } - } - - return false; - } - - /** - * @return Involved nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - boolean found = false; - - for (IgniteInternalFuture<?> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); - - found = true; - } - } - - return found; - } - - /** - * @param nodeId Failed node ID. - * @param mappings Remaining mappings. - * @param e Error. - */ - void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { - if (err.compareAndSet(null, e)) { - boolean marked = tx.setRollbackOnly(); - - if (e instanceof IgniteTxOptimisticCheckedException) { - assert nodeId != null : "Missing node ID for optimistic failure exception: " + e; - - tx.removeKeysMapping(nodeId, mappings); - } - - if (e instanceof IgniteTxRollbackCheckedException) { - if (marked) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to automatically rollback transaction: " + tx, ex); - } - } - } - - onComplete(); - } - } - - /** - * @return {@code True} if all locks are owned. - */ - private boolean checkLocks() { - boolean locked = lockKeys.isEmpty(); - - if (locked) { - if (log.isDebugEnabled()) - log.debug("All locks are acquired for near prepare future: " + this); - } - else { - if (log.isDebugEnabled()) - log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); - } - - return locked; - } - - /** - * @param e Error. - */ - void onError(Throwable e) { - onError(null, null, e); - } - - /** - * @param nodeId Sender. - * @param res Result. - */ - public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { - if (!isDone()) { - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); - - f.onResult(nodeId, res); - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public boolean onDone(IgniteInternalTx t, Throwable err) { - // If locks were not acquired yet, delay completion. - if (isDone() || (err == null && !checkLocks())) - return false; - - this.err.compareAndSet(null, err); - - if (err == null) - tx.state(PREPARED); - - if (super.onDone(tx, err)) { - // Don't forget to clean up. - cctx.mvcc().removeFuture(this); - - return true; - } - - return false; - } - - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteInternalFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - /** - * Completeness callback. - */ - private void onComplete() { - if (super.onDone(tx, err.get())) - // Don't forget to clean up. - cctx.mvcc().removeFuture(this); - } - - /** - * Completes this future. - */ - void complete() { - onComplete(); - } - - /** - * Waits for topology exchange future to be ready and then prepares user transaction. - */ - public void prepare() { - if (tx.optimistic()) { - // Obtain the topology version to use. - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - - if (topVer != null) { - tx.topologyVersion(topVer); - - prepare0(); - - return; - } - - prepareOnTopology(); - - } - else - preparePessimistic(); - } - - /** - * - */ - private void prepareOnTopology() { - GridDhtTopologyFuture topFut = topologyReadLock(); - - try { - if (topFut == null) { - assert isDone(); - - return; - } - - if (topFut.isDone()) { - StringBuilder invalidCaches = new StringBuilder(); - Boolean cacheInvalid = false; - for (GridCacheContext ctx : cctx.cacheContexts()) { - if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) { - if (cacheInvalid) - invalidCaches.append(", "); - - invalidCaches.append(U.maskName(ctx.name())); - - cacheInvalid = true; - } - } - - if (cacheInvalid) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); - - return; - } - - tx.topologyVersion(topFut.topologyVersion()); - - prepare0(); - } - else { - topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override - public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override - public void run() { - prepareOnTopology(); - } - }); - } - }); - } - } - finally { - topologyReadUnlock(); - } - } - - /** - * Acquires topology read lock. - * - * @return Topology ready future. - */ - private GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<K, V> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); - } - - /** - * Releases topology read lock. - */ - private void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<K, V> nonLocalCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocalCtx = cacheCtx; - - break; - } - } - - if (nonLocalCtx != null) - nonLocalCtx.topology().readUnlock(); - } - } - - /** - * Initializes future. - */ - private void prepare0() { - assert tx.optimistic(); - - try { - if (!tx.state(PREPARING)) { - if (tx.setRollbackOnly()) { - if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); - else - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); - } - else - onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); - - return; - } - - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); - - prepare( - tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries()); - - markInitialized(); - } - catch (TransactionTimeoutException | TransactionOptimisticException e) { - onError(cctx.localNodeId(), null, e); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - - /** - * @param reads Read entries. - * @param writes Write entries. - * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node. - */ - private void prepare( - Iterable<IgniteTxEntry> reads, - Iterable<IgniteTxEntry> writes - ) throws IgniteCheckedException { - assert tx.optimistic(); - - AffinityTopologyVersion topVer = tx.topologyVersion(); - - assert topVer.topologyVersion() > 0; - - txMapping = new GridDhtTxMapping<>(); - - ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = - new ConcurrentLinkedDeque8<>(); - - if (!F.isEmpty(reads) || !F.isEmpty(writes)) { - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - } - - // Assign keys to primary nodes. - GridDistributedTxMapping cur = null; - - for (IgniteTxEntry read : reads) { - GridDistributedTxMapping updated = map(read, topVer, cur, false); - - if (cur != updated) { - mappings.offer(updated); - - if (updated.node().isLocal()) { - if (read.context().isNear()) - tx.nearLocallyMapped(true); - else if (read.context().isColocated()) - tx.colocatedLocallyMapped(true); - } - - cur = updated; - } - } - - for (IgniteTxEntry write : writes) { - GridDistributedTxMapping updated = map(write, topVer, cur, true); - - if (cur != updated) { - mappings.offer(updated); - - if (updated.node().isLocal()) { - if (write.context().isNear()) - tx.nearLocallyMapped(true); - else if (write.context().isColocated()) - tx.colocatedLocallyMapped(true); - } - - cur = updated; - } - } - - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); - - return; - } - - tx.addEntryMapping(mappings); - - cctx.mvcc().recheckPendingLocks(); - - txMapping.initLast(mappings); - - tx.transactionNodes(txMapping.transactionNodes()); - - checkOnePhase(); - - proceedPrepare(mappings); - } - - /** - * - */ - private void preparePessimistic() { - Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); - - AffinityTopologyVersion topVer = tx.topologyVersion(); - - txMapping = new GridDhtTxMapping<>(); - - for (IgniteTxEntry txEntry : tx.allEntries()) { - GridCacheContext cacheCtx = txEntry.context(); - - List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer); - - ClusterNode primary = F.first(nodes); - - boolean near = cacheCtx.isNear(); - - IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near); - - GridDistributedTxMapping nodeMapping = mappings.get(key); - - if (nodeMapping == null) { - nodeMapping = new GridDistributedTxMapping(primary); - - nodeMapping.near(cacheCtx.isNear()); - - mappings.put(key, nodeMapping); - } - - txEntry.nodeId(primary.id()); - - nodeMapping.add(txEntry); - - txMapping.addMapping(nodes); - } - - tx.transactionNodes(txMapping.transactionNodes()); - - checkOnePhase(); - - for (final GridDistributedTxMapping m : mappings.values()) { - final ClusterNode node = m.node(); - - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - m.reads(), - m.writes(), - /*grp lock key*/null, - /*part lock*/false, - m.near(), - txMapping.transactionNodes(), - true, - txMapping.transactionNodes().get(node.id()), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash()); - - for (IgniteTxEntry txEntry : m.writes()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } - - final MiniFuture fut = new MiniFuture(m, null); - - req.miniId(fut.futureId()); - - add(fut); - - if (node.isLocal()) { - cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() { - @Override public void apply(GridNearTxPrepareResponse res) { - fut.onResult(node.id(), res); - } - }); - } - else { - try { - cctx.io().send(node, req, tx.ioPolicy()); - } - catch (IgniteCheckedException e) { - // Fail the whole thing. - fut.onResult(e); - } - } - } - - markInitialized(); - } - - /** - * Checks if mapped transaction can be committed on one phase. - * One-phase commit can be done if transaction maps to one primary node and not more than one backup. - */ - private void checkOnePhase() { - if (tx.storeUsed()) - return; - - Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); - - if (map.size() == 1) { - Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map); - - assert entry != null; - - Collection<UUID> backups = entry.getValue(); - - if (backups.size() <= 1) - tx.onePhaseCommit(true); - } - } - - /** - * Continues prepare after previous mapping successfully finished. - * - * @param mappings Queue of mappings. - */ - private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) { - if (isDone()) - return; - - final GridDistributedTxMapping m = mappings.poll(); - - if (m == null) - return; - - assert !m.empty(); - - final ClusterNode n = m.node(); - - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - tx.optimistic() && tx.serializable() ? m.reads() : null, - m.writes(), - tx.groupLockKey(), - tx.partitionLock(), - m.near(), - txMapping.transactionNodes(), - m.last(), - m.lastBackups(), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash()); - - for (IgniteTxEntry txEntry : m.writes()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } - - // Must lock near entries separately. - if (m.near()) { - try { - tx.optimisticLockEntries(req.writes()); - - tx.userPrepare(); - } - catch (IgniteCheckedException e) { - onError(null, null, e); - } - } - - final MiniFuture fut = new MiniFuture(m, mappings); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - - // If this is the primary node for the keys. - if (n.isLocal()) { - // At this point, if any new node joined, then it is - // waiting for this transaction to complete, so - // partition reassignments are not possible here. - cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() { - @Override public void apply(GridNearTxPrepareResponse res) { - fut.onResult(n.id(), res); - } - }); - } - else { - assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx + - ", nodeId=" + n.id() + ']'; - - try { - cctx.io().send(n, req, tx.ioPolicy()); - } - catch (IgniteCheckedException e) { - // Fail the whole thing. - fut.onResult(e); - } - } - } - - /** - * @param entry Transaction entry. - * @param topVer Topology version. - * @param cur Current mapping. - * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. - * @return Mapping. - */ - private GridDistributedTxMapping map( - IgniteTxEntry entry, - AffinityTopologyVersion topVer, - GridDistributedTxMapping cur, - boolean waitLock - ) throws IgniteCheckedException { - GridCacheContext cacheCtx = entry.context(); - - List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); - - txMapping.addMapping(nodes); - - ClusterNode primary = F.first(nodes); - - assert primary != null; - - if (log.isDebugEnabled()) { - log.debug("Mapped key to primary node [key=" + entry.key() + - ", part=" + cacheCtx.affinity().partition(entry.key()) + - ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); - } - - if (tx.groupLock() && !primary.isLocal()) - throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " + - " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']'); - - // Must re-initialize cached entry while holding topology lock. - if (cacheCtx.isNear()) - entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); - else if (!cacheCtx.isLocal()) - entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); - else - entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); - - if (cacheCtx.isNear() || cacheCtx.isLocal()) { - if (waitLock && entry.explicitVersion() == null) { - if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey())) - lockKeys.add(entry.txKey()); - } - } - - if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { - cur = new GridDistributedTxMapping(primary); - - // Initialize near flag right away. - cur.near(cacheCtx.isNear()); - } - - cur.add(entry); - - if (entry.explicitVersion() != null) { - tx.markExplicit(primary.id()); - - cur.markExplicitLock(); - } - - entry.nodeId(primary.id()); - - if (cacheCtx.isNear()) { - while (true) { - try { - GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); - - cached.dhtNodeId(tx.xidVersion(), primary.id()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - entry.cached(cacheCtx.near().entryEx(entry.key())); - } - } - } - - return cur; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridNearTxPrepareFuture.class, this, super.toString()); - } - - /** - * Mini-future for get operations. Mini-futures are only waiting on a single - * node as opposed to multiple nodes. - */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Keys. */ - @GridToStringInclude - private GridDistributedTxMapping m; - - /** Flag to signal some result being processed. */ - private AtomicBoolean rcvRes = new AtomicBoolean(false); - - /** Mappings to proceed prepare. */ - private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings; - - /** - * @param m Mapping. - * @param mappings Queue of mappings to proceed with. - */ - MiniFuture( - GridDistributedTxMapping m, - ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings - ) { - this.m = m; - this.mappings = mappings; - } - - /** - * @return Future ID. - */ - IgniteUuid futureId() { - return futId; - } - - /** - * @return Node ID. - */ - public ClusterNode node() { - return m.node(); - } - - /** - * @return Keys. - */ - public GridDistributedTxMapping mapping() { - return m; - } - - /** - * @param e Error. - */ - void onResult(Throwable e) { - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); - } - else - U.warn(log, "Received error after another result has been processed [fut=" + - GridNearTxPrepareFuture.this + ", mini=" + this + ']', e); - } - - /** - * @param e Node failure. - */ - void onResult(ClusterTopologyCheckedException e) { - if (isDone()) - return; - - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); - - // Fail the whole future (make sure not to remap on different primary node - // to prevent multiple lock coordinators). - onError(null, null, e); - } - } - - /** - * @param nodeId Failed node ID. - * @param res Result callback. - */ - void onResult(UUID nodeId, GridNearTxPrepareResponse res) { - if (isDone()) - return; - - if (rcvRes.compareAndSet(false, true)) { - if (res.error() != null) { - // Fail the whole compound future. - onError(nodeId, mappings, res.error()); - } - else { - assert F.isEmpty(res.invalidPartitions()); - - for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) { - IgniteTxEntry txEntry = tx.entry(entry.getKey()); - - assert txEntry != null; - - GridCacheContext cacheCtx = txEntry.context(); - - while (true) { - try { - if (cacheCtx.isNear()) { - GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); - - CacheVersionedValue tup = entry.getValue(); - - nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(), - tup.version(), m.node().id(), tx.topologyVersion()); - } - else if (txEntry.cached().detached()) { - GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); - - CacheVersionedValue tup = entry.getValue(); - - detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion()); - } - - break; - } - catch (GridCacheEntryRemovedException ignored) { - // Retry. - } - catch (IgniteCheckedException e) { - // Fail the whole compound future. - onError(nodeId, mappings, e); - - return; - } - } - } - - tx.implicitSingleResult(res.returnValue()); - - for (IgniteTxKey key : res.filterFailedKeys()) { - IgniteTxEntry txEntry = tx.entry(key); - - assert txEntry != null : "Missing tx entry for write key: " + key; - - txEntry.op(NOOP); - - assert txEntry.context() != null; - - ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry); - - if (expiry != null) - txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); - } - - if (!m.empty()) { - // Register DHT version. - tx.addDhtVersion(m.node().id(), res.dhtVersion()); - - m.dhtVersion(res.dhtVersion()); - - if (m.near()) - tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); - } - - // Proceed prepare before finishing mini future. - if (mappings != null) - proceedPrepare(mappings); - - // Finish this mini future. - onDone(tx); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java new file mode 100644 index 0000000..60b918c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import javax.cache.expiry.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; + +/** + * Common code for tx prepare in optimistic and pessimistic modes. + */ +public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx> + implements GridCacheFuture<IgniteInternalTx> { + /** Logger reference. */ + protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** */ + private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER = + new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { + @Override public boolean collect(IgniteInternalTx e) { + return true; + } + + @Override public IgniteInternalTx reduce() { + // Nothing to aggregate. + return null; + } + }; + + /** Logger. */ + protected static IgniteLogger log; + + /** Context. */ + protected GridCacheSharedContext<?, ?> cctx; + + /** Future ID. */ + protected IgniteUuid futId; + + /** Transaction. */ + @GridToStringInclude + protected GridNearTxLocal tx; + + /** Error. */ + @GridToStringExclude + protected AtomicReference<Throwable> err = new AtomicReference<>(null); + + /** Trackable flag. */ + protected boolean trackable = true; + + /** Full information about transaction nodes mapping. */ + protected GridDhtTxMapping txMapping; + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) { + super(cctx.kernalContext(), REDUCER); + + assert cctx != null; + assert tx != null; + + this.cctx = cctx; + this.tx = tx; + + futId = IgniteUuid.randomUuid(); + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return tx.xidVersion(); + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** + * Prepares transaction. + */ + public abstract void prepare(); + + /** + * @param nodeId Sender. + * @param res Result. + */ + public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res); + + /** + * Checks if mapped transaction can be committed on one phase. + * One-phase commit can be done if transaction maps to one primary node and not more than one backup. + */ + protected final void checkOnePhase() { + if (tx.storeUsed()) + return; + + Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); + + if (map.size() == 1) { + Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map); + + assert entry != null; + + Collection<UUID> backups = entry.getValue(); + + if (backups.size() <= 1) + tx.onePhaseCommit(true); + } + } + + /** + * @param m Mapping. + * @param res Response. + */ + protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) { + if (res == null) + return; + + assert res.error() == null : res; + assert F.isEmpty(res.invalidPartitions()) : res; + + for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) { + IgniteTxEntry txEntry = tx.entry(entry.getKey()); + + assert txEntry != null; + + GridCacheContext cacheCtx = txEntry.context(); + + while (true) { + try { + if (cacheCtx.isNear()) { + GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); + + CacheVersionedValue tup = entry.getValue(); + + nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(), + tup.version(), m.node().id(), tx.topologyVersion()); + } + else if (txEntry.cached().detached()) { + GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); + + CacheVersionedValue tup = entry.getValue(); + + detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion()); + } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + // Retry. + } + } + } + + tx.implicitSingleResult(res.returnValue()); + + for (IgniteTxKey key : res.filterFailedKeys()) { + IgniteTxEntry txEntry = tx.entry(key); + + assert txEntry != null : "Missing tx entry for write key: " + key; + + txEntry.op(NOOP); + + assert txEntry.context() != null; + + ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry); + + if (expiry != null) + txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); + } + + if (!m.empty()) { + // Register DHT version. + tx.addDhtVersion(m.node().id(), res.dhtVersion()); + + m.dhtVersion(res.dhtVersion()); + + if (m.near()) + tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index f0587ac..a08637d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -88,8 +88,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param tx Transaction. * @param reads Read entries. * @param writes Write entries. - * @param grpLockKey Group lock key if preparing group-lock transaction. - * @param partLock {@code True} if preparing group-lock transaction with partition lock. * @param near {@code True} if mapping is for near caches. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. @@ -103,8 +101,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { IgniteInternalTx tx, Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, - IgniteTxKey grpLockKey, - boolean partLock, boolean near, Map<UUID, Collection<UUID>> txNodes, boolean last, @@ -116,7 +112,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { @Nullable UUID subjId, int taskNameHash ) { - super(tx, reads, writes, grpLockKey, partLock, txNodes, onePhaseCommit); + super(tx, reads, writes, txNodes, onePhaseCommit); assert futId != null; @@ -270,67 +266,67 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 25: + case 23: if (!writer.writeBoolean("explicitLock", explicitLock)) return false; writer.incrementState(); - case 26: + case 24: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 27: + case 25: if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); - case 28: + case 26: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 29: + case 27: if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); - case 30: + case 28: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 31: + case 29: if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); - case 32: + case 30: if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); - case 33: + case 31: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 34: + case 32: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 35: + case 33: if (!writer.writeMessage("topVer", topVer)) return false; @@ -352,7 +348,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 25: + case 23: explicitLock = reader.readBoolean("explicitLock"); if (!reader.isLastRead()) @@ -360,7 +356,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 24: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -368,7 +364,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 25: implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) @@ -376,7 +372,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 26: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -384,7 +380,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 27: lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) @@ -392,7 +388,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 28: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -400,7 +396,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 29: near = reader.readBoolean("near"); if (!reader.isLastRead()) @@ -408,7 +404,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 30: retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) @@ -416,7 +412,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 31: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -424,7 +420,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 32: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -432,7 +428,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 35: + case 33: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -452,7 +448,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 36; + return 34; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index b6b6017..49283cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -51,9 +51,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { /** Owned versions. */ private Map<IgniteTxKey, GridCacheVersion> owned; - /** Group lock flag. */ - private boolean grpLock; - /** * Empty constructor required for {@link Externalizable}. */ @@ -78,7 +75,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param writeEntries Write entries. * @param ctx Cache registry. * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. * @throws IgniteCheckedException If unmarshalling failed. */ public GridNearTxRemote( @@ -97,12 +93,11 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { long timeout, Collection<IgniteTxEntry> writeEntries, int txSize, - @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) throws IgniteCheckedException { super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, grpLockKey, subjId, taskNameHash); + txSize, subjId, taskNameHash); assert nearNodeId != null; @@ -138,7 +133,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param timeout Timeout. * @param ctx Cache registry. * @param txSize Expected transaction size. - * @param grpLockKey Collection of group lock keys if this is a group-lock transaction. */ public GridNearTxRemote( GridCacheSharedContext ctx, @@ -155,12 +149,11 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { boolean invalidate, long timeout, int txSize, - @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, grpLockKey, subjId, taskNameHash); + txSize, subjId, taskNameHash); assert nearNodeId != null; @@ -192,19 +185,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } /** - * Marks near local transaction as group lock. Note that near remote transaction may be - * marked as group lock even if it does not contain any locked key. - */ - public void markGroupLock() { - grpLock = true; - } - - /** {@inheritDoc} */ - @Override public boolean groupLock() { - return grpLock || super.groupLock(); - } - - /** * @return Near transaction ID. */ @Override public GridCacheVersion nearXidVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 84d4c90..ea59f1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -29,6 +29,9 @@ import static org.apache.ignite.events.EventType.*; */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) public class GridLocalCacheEntry extends GridCacheMapEntry { + /** Off-heap value pointer. */ + private long valPtr; + /** * @param ctx Cache registry. * @param key Cache key. @@ -384,4 +387,19 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { return doomed != null; } + + /** {@inheritDoc} */ + @Override protected boolean hasOffHeapPointer() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override protected long offHeapPointer() { + return valPtr; + } + + /** {@inheritDoc} */ + @Override protected void offHeapPointer(long valPtr) { + this.valPtr = valPtr; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 8dc07cc..5f877ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -284,16 +284,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean empty(); /** - * @return {@code True} if transaction group-locked. - */ - public boolean groupLock(); - - /** - * @return Group lock key if {@link #groupLock()} is {@code true}. - */ - @Nullable public IgniteTxKey groupLockKey(); - - /** * @return {@code True} if preparing flag was set with this call. */ public boolean markPreparing(); @@ -551,7 +541,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { * * @return Future for prepare step. */ - public IgniteInternalFuture<IgniteInternalTx> prepareAsync(); + public IgniteInternalFuture<?> prepareAsync(); /** * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) @@ -580,7 +570,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * @return Future for transaction prepare if prepare is in progress. */ - @Nullable public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture(); + @Nullable public IgniteInternalFuture<?> currentPrepareFuture(); /** * @param state Transaction state. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 044c3d7..99907e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -160,9 +160,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, timeout, true, - txSize, - /** group lock keys */null, - /** partition lock */false + txSize ); assert tx != null;