#ingite-9655 - Manual merge of changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0354a41b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0354a41b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0354a41b Branch: refs/heads/sprint-1 Commit: 0354a41b94e891273a06587c6cea95aaee15d995 Parents: 31ce585 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Jan 28 18:48:49 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Jan 28 18:48:49 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 34 +- .../processors/cache/GridCacheUtils.java | 15 - .../distributed/GridCacheCommittedTxInfo.java | 2 - .../GridCachePerThreadTxCommitBuffer.java | 185 ------ ...dCachePessimisticCheckCommittedTxFuture.java | 380 ------------ ...CachePessimisticCheckCommittedTxRequest.java | 292 --------- ...achePessimisticCheckCommittedTxResponse.java | 231 ------- .../distributed/GridCacheTxCommitBuffer.java | 60 -- .../distributed/GridDistributedLockRequest.java | 83 --- .../GridDistributedTxFinishRequest.java | 90 +-- .../GridDistributedTxPrepareRequest.java | 49 +- .../GridDistributedTxRemoteAdapter.java | 53 +- .../distributed/dht/GridDhtLockFuture.java | 41 +- .../distributed/dht/GridDhtLockRequest.java | 6 +- .../dht/GridDhtTransactionalCacheAdapter.java | 19 +- .../distributed/dht/GridDhtTxFinishFuture.java | 16 +- .../distributed/dht/GridDhtTxFinishRequest.java | 70 +-- .../cache/distributed/dht/GridDhtTxLocal.java | 21 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 108 ++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 503 ++++++++++----- .../dht/GridDhtTxPrepareRequest.java | 4 +- .../cache/distributed/dht/GridDhtTxRemote.java | 12 +- .../colocated/GridDhtColocatedLockFuture.java | 41 -- .../distributed/near/GridNearLockFuture.java | 8 - .../distributed/near/GridNearLockRequest.java | 6 +- .../near/GridNearTransactionalCache.java | 3 +- .../near/GridNearTxFinishFuture.java | 2 - .../near/GridNearTxFinishRequest.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 46 +- .../near/GridNearTxPrepareFuture.java | 308 ++++++++-- .../near/GridNearTxPrepareRequest.java | 29 +- .../near/GridNearTxPrepareResponse.java | 63 +- .../cache/transactions/IgniteTxAdapter.java | 31 +- .../cache/transactions/IgniteTxEntry.java | 17 - .../cache/transactions/IgniteTxEx.java | 7 - .../cache/transactions/IgniteTxHandler.java | 608 +++---------------- .../transactions/IgniteTxLocalAdapter.java | 118 +++- .../cache/transactions/IgniteTxLocalEx.java | 5 + .../cache/transactions/IgniteTxManager.java | 208 ++----- .../cache/transactions/IgniteTxProxyImpl.java | 7 + .../util/GridBoundedConcurrentOrderedMap.java | 20 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 182 +++--- .../cache/GridCacheAbstractMetricsSelfTest.java | 4 +- .../cache/IgnitePutAllLargeBatchSelfTest.java | 301 +++++++++ ...tAllUpdateNonPreloadedPartitionSelfTest.java | 129 ++++ .../distributed/GridCacheEventAbstractTest.java | 2 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 29 +- ...ssimisticOriginatingNodeFailureSelfTest.java | 2 +- .../near/GridCacheNearMetricsSelfTest.java | 8 +- .../near/GridCacheNearReadersSelfTest.java | 2 +- ...ssimisticOriginatingNodeFailureSelfTest.java | 2 +- ...ssimisticOriginatingNodeFailureSelfTest.java | 5 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 53 files changed, 1758 insertions(+), 2718 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9271e29..216c13c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4190,7 +4190,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, true, op.single(), ctx.system(), - PESSIMISTIC, + OPTIMISTIC, READ_COMMITTED, tCfg.getDefaultTxTimeout(), ctx.hasFlag(INVALIDATE), @@ -4265,7 +4265,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, true, op.single(), ctx.system(), - PESSIMISTIC, + OPTIMISTIC, READ_COMMITTED, ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(), ctx.hasFlag(INVALIDATE), @@ -4301,13 +4301,24 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, try { IgniteFuture fut = holder.future(); - if (fut != null && !fut.isDone()) { - final IgniteTxLocalAdapter<K, V> tx0 = tx; + final IgniteTxLocalAdapter<K, V> tx0 = tx; + if (fut != null && !fut.isDone()) { IgniteFuture<T> f = new GridEmbeddedFuture<>(fut, new C2<T, Exception, IgniteFuture<T>>() { @Override public IgniteFuture<T> apply(T t, Exception e) { - return op.op(tx0); + return op.op(tx0).chain(new CX1<IgniteFuture<T>, T>() { + @Override public T applyx(IgniteFuture<T> tFut) throws IgniteCheckedException { + try { + return tFut.get(); + } + catch (IgniteCheckedException e1) { + tx0.rollbackAsync(); + + throw e1; + } + } + }); } }, ctx.kernalContext()); @@ -4316,7 +4327,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return f; } - IgniteFuture<T> f = op.op(tx); + IgniteFuture<T> f = op.op(tx).chain(new CX1<IgniteFuture<T>, T>() { + @Override public T applyx(IgniteFuture<T> tFut) throws IgniteCheckedException { + try { + return tFut.get(); + } + catch (IgniteCheckedException e1) { + tx0.rollbackAsync(); + + throw e1; + } + } + }); saveFuture(holder, f); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 129dc54..1ce00b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -134,13 +134,6 @@ public class GridCacheUtils { } }; - /** Transfer required predicate. */ - private static final IgnitePredicate TRANSFER_REQUIRED_PREDICATE = new P1<IgniteTxEntry>() { - @Override public boolean apply(IgniteTxEntry e) { - return e.transferRequired(); - } - }; - /** Transaction entry to key. */ private static final IgniteClosure tx2key = new C1<IgniteTxEntry, Object>() { @Override public Object apply(IgniteTxEntry e) { @@ -843,14 +836,6 @@ public class GridCacheUtils { } /** - * @return Transfer required predicate. - */ - @SuppressWarnings("unchecked") - public static <K, V> IgnitePredicate<IgniteTxEntry<K, V>> transferRequired() { - return TRANSFER_REQUIRED_PREDICATE; - } - - /** * Gets type filter for projections. * * @param keyType Key type. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java index 8231465..23ce77b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java @@ -58,8 +58,6 @@ public class GridCacheCommittedTxInfo<K, V> implements Externalizable { originatingTxId = tx.nearXidVersion(); originatingNodeId = tx.eventNodeId(); - - recoveryWrites = tx.recoveryWrites(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java deleted file mode 100644 index 0612a5d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.timeout.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Committed tx buffer which should be used in synchronous commit mode. - */ -public class GridCachePerThreadTxCommitBuffer<K, V> implements GridCacheTxCommitBuffer<K, V> { - /** Logger. */ - private IgniteLogger log; - - /** Cache context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Store map. */ - private Map<StoreKey, GridCacheCommittedTxInfo<K, V>> infoMap; - - /** - * @param cctx Cache context. - */ - public GridCachePerThreadTxCommitBuffer(GridCacheSharedContext<K, V> cctx) { - this.cctx = cctx; - - log = cctx.logger(GridCachePerThreadTxCommitBuffer.class); - - int logSize = cctx.txConfig().getPessimisticTxLogSize(); - - infoMap = logSize > 0 ? - new GridBoundedConcurrentLinkedHashMap<StoreKey, GridCacheCommittedTxInfo<K, V>>(logSize) : - new ConcurrentHashMap8<StoreKey, GridCacheCommittedTxInfo<K, V>>(); - } - - /** {@inheritDoc} */ - @Override public void addCommittedTx(IgniteTxEx<K, V> tx) { - long threadId = tx.threadId(); - - StoreKey key = new StoreKey(tx.eventNodeId(), threadId); - - if (log.isDebugEnabled()) - log.debug("Adding committed transaction [locNodeId=" + cctx.localNodeId() + ", key=" + key + - ", tx=" + tx + ']'); - - infoMap.put(key, new GridCacheCommittedTxInfo<>(tx)); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridCacheCommittedTxInfo<K, V> committedTx(GridCacheVersion originatingTxVer, - UUID nodeId, long threadId) { - assert originatingTxVer != null; - - StoreKey key = new StoreKey(nodeId, threadId); - - GridCacheCommittedTxInfo<K, V> txInfo = infoMap.get(key); - - if (log.isDebugEnabled()) - log.debug("Got committed transaction info by key [locNodeId=" + cctx.localNodeId() + - ", key=" + key + ", originatingTxVer=" + originatingTxVer + ", txInfo=" + txInfo + ']'); - - if (txInfo == null || !originatingTxVer.equals(txInfo.originatingTxId())) - return null; - - return txInfo; - } - - /** - * @param nodeId Left node ID. - */ - @Override public void onNodeLeft(UUID nodeId) { - // Clear all node's records after clear interval. - cctx.kernalContext().timeout().addTimeoutObject( - new NodeLeftTimeoutObject(cctx.txConfig().getPessimisticTxLogLinger(), nodeId)); - } - - /** {@inheritDoc} */ - @Override public int size() { - return infoMap.size(); - } - - /** - * Store key. - */ - private static class StoreKey { - /** Node ID which started transaction. */ - private UUID nodeId; - - /** Thread ID which started transaction. */ - private long threadId; - - /** - * @param nodeId Node ID. - * @param threadId Thread ID. - */ - private StoreKey(UUID nodeId, long threadId) { - this.nodeId = nodeId; - this.threadId = threadId; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - StoreKey storeKey = (StoreKey)o; - - return threadId == storeKey.threadId && nodeId.equals(storeKey.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int)(threadId ^ (threadId >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(StoreKey.class, this); - } - } - - /** - * Node left timeout object which will clear all committed records from left node. - */ - private class NodeLeftTimeoutObject extends GridTimeoutObjectAdapter { - /** Left node ID. */ - private UUID leftNodeId; - - /** - * @param timeout Timeout. - * @param leftNodeId Left node ID. - */ - protected NodeLeftTimeoutObject(long timeout, UUID leftNodeId) { - super(timeout); - - this.leftNodeId = leftNodeId; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - Iterator<StoreKey> it = infoMap.keySet().iterator(); - - while (it.hasNext()) { - StoreKey key = it.next(); - - if (leftNodeId.equals(key.nodeId())) - it.remove(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java deleted file mode 100644 index 12b9177..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Future verifying that all remote transactions related to some - * optimistic transaction were prepared. - */ -public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompoundIdentityFuture<GridCacheCommittedTxInfo<K, V>> - implements GridCacheFuture<GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Trackable flag. */ - private boolean trackable = true; - - /** Context. */ - private final GridCacheSharedContext<K, V> cctx; - - /** Future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Transaction. */ - private final IgniteTxEx<K, V> tx; - - /** All involved nodes. */ - private final Map<UUID, ClusterNode> nodes; - - /** ID of failed node started transaction. */ - private final UUID failedNodeId; - - /** Flag indicating that future checks near node instead of checking all topology in case of primary node crash. */ - private boolean nearCheck; - - /** - * @param cctx Context. - * @param tx Transaction. - * @param failedNodeId ID of failed node started transaction. - */ - @SuppressWarnings("ConstantConditions") - public GridCachePessimisticCheckCommittedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteTxEx<K, V> tx, - UUID failedNodeId) { - super(cctx.kernalContext(), new SingleReducer<K, V>()); - - this.cctx = cctx; - this.tx = tx; - this.failedNodeId = failedNodeId; - - nodes = new GridLeanMap<>(); - - for (ClusterNode node : CU.allNodes(cctx, tx.topologyVersion())) - nodes.put(node.id(), node); - } - - /** - * Initializes future. - */ - public void prepare() { - if (log.isDebugEnabled()) - log.debug("Checking if transaction was committed on remote nodes: " + tx); - - // Check local node first (local node can be a backup node for some part of this transaction). - long originatingThreadId = tx.threadId(); - - if (tx instanceof IgniteTxRemoteEx) - originatingThreadId = ((IgniteTxRemoteEx)tx).remoteThreadId(); - - GridCacheCommittedTxInfo<K, V> txInfo = cctx.tm().txCommitted(tx.nearXidVersion(), tx.eventNodeId(), - originatingThreadId); - - if (txInfo != null) { - onDone(txInfo); - - markInitialized(); - - return; - } - - Collection<ClusterNode> checkNodes = CU.remoteNodes(cctx, tx.topologyVersion()); - - if (tx instanceof GridDhtTxRemote) { - // If we got primary node failure and near node has not failed. - if (tx.nodeId().equals(failedNodeId) && !tx.eventNodeId().equals(failedNodeId)) { - nearCheck = true; - - ClusterNode nearNode = cctx.discovery().node(tx.eventNodeId()); - - if (nearNode == null) { - // Near node failed, separate check prepared future will take care of it. - onDone(new ClusterTopologyException("Failed to check near transaction state (near node left grid): " + - tx.eventNodeId())); - - return; - } - - checkNodes = Collections.singletonList(nearNode); - } - } - - for (ClusterNode rmtNode : checkNodes) { - // Skip left nodes and local node. - if (rmtNode.id().equals(failedNodeId)) - continue; - - GridCachePessimisticCheckCommittedTxRequest<K, V> req = new GridCachePessimisticCheckCommittedTxRequest<>( - tx, - originatingThreadId, futureId(), nearCheck); - - if (rmtNode.isLocal()) - add(cctx.tm().checkPessimisticTxCommitted(req)); - else { - MiniFuture fut = new MiniFuture(rmtNode.id()); - - req.miniId(fut.futureId()); - - add(fut); - - try { - cctx.io().send(rmtNode.id(), req); - } - catch (ClusterTopologyException ignored) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - - break; - } - } - } - - markInitialized(); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - public void onResult(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - if (!isDone()) { - for (IgniteFuture<GridCacheCommittedTxInfo<K, V>> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.nodeId().equals(nodeId); - - f.onResult(res); - - break; - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return nodes.values(); - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.nodeId().equals(nodeId)) { - f.onNodeLeft(); - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable GridCacheCommittedTxInfo<K, V> res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - cctx.mvcc().removeFuture(this); - - if (log.isDebugEnabled()) - log.debug("Completing check committed tx future for transaction [tx=" + tx + ", res=" + res + - ", err=" + err + ']'); - - if (err == null) - cctx.tm().finishPessimisticTxOnRecovery(tx, res); - else { - if (log.isDebugEnabled()) - log.debug("Failed to check prepared transactions, " + - "invalidating transaction [err=" + err + ", tx=" + tx + ']'); - - if (nearCheck) - return true; - - cctx.tm().salvageTx(tx); - } - - return true; - } - - return false; - } - - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxFuture.class, this, "super", super.toString()); - } - - /** - * - */ - private class MiniFuture extends GridFutureAdapter<GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Mini future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Node ID. */ - private UUID nodeId; - - /** - * Empty constructor required by {@link Externalizable} - */ - public MiniFuture() { - // No-op. - } - - /** - * @param nodeId Node ID. - */ - private MiniFuture(UUID nodeId) { - super(cctx.kernalContext()); - - this.nodeId = nodeId; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return nodeId; - } - - /** - * @return Future ID. - */ - private IgniteUuid futureId() { - return futId; - } - - /** - * @param e Error. - */ - private void onError(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - onDone(e); - } - - /** - */ - private void onNodeLeft() { - if (log.isDebugEnabled()) - log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); - - if (nearCheck) { - onDone(new ClusterTopologyException("Failed to check near transaction state (near node left grid): " + - nodeId)); - - return; - } - - onDone((GridCacheCommittedTxInfo<K, V>)null); - } - - /** - * @param res Result callback. - */ - private void onResult(GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - onDone(res.committedTxInfo()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); - } - } - - /** - * Single value reducer. - */ - private static class SingleReducer<K, V> implements - IgniteReducer<GridCacheCommittedTxInfo<K, V>, GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private AtomicReference<GridCacheCommittedTxInfo<K, V>> collected = new AtomicReference<>(); - - /** {@inheritDoc} */ - @Override public boolean collect(@Nullable GridCacheCommittedTxInfo<K, V> info) { - if (info != null) { - collected.compareAndSet(null, info); - - // Stop collecting on first collected info. - return false; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public GridCacheCommittedTxInfo<K, V> reduce() { - return collected.get(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java deleted file mode 100644 index 012106d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Message sent to check that transactions related to some pessimistic transaction - * were prepared on remote node. - */ -public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Near transaction ID. */ - private GridCacheVersion nearXidVer; - - /** Originating node ID. */ - private UUID originatingNodeId; - - /** Originating thread ID. */ - private long originatingThreadId; - - /** Flag indicating that this is near-only check. */ - @GridDirectVersion(1) - private boolean nearOnlyCheck; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCachePessimisticCheckCommittedTxRequest() { - // No-op. - } - - /** - * @param tx Transaction. - * @param originatingThreadId Originating thread ID. - * @param futId Future ID. - */ - public GridCachePessimisticCheckCommittedTxRequest(IgniteTxEx<K, V> tx, long originatingThreadId, IgniteUuid futId, - boolean nearOnlyCheck) { - super(tx.xidVersion(), 0); - - this.futId = futId; - this.nearOnlyCheck = nearOnlyCheck; - - nearXidVer = tx.nearXidVersion(); - originatingNodeId = tx.eventNodeId(); - this.originatingThreadId = originatingThreadId; - } - - /** - * @return Near version. - */ - public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** - * @return Tx originating node ID. - */ - public UUID originatingNodeId() { - return originatingNodeId; - } - - /** - * @return Tx originating thread ID. - */ - public long originatingThreadId() { - return originatingThreadId; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @param miniId Mini ID to set. - */ - public void miniId(IgniteUuid miniId) { - this.miniId = miniId; - } - - /** - * @return Flag indicating that this request was sent only to near node. If this flag is set, no finalizing - * will be executed on receiving (near) node since this is a user node. - */ - public boolean nearOnlyCheck() { - return nearOnlyCheck; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridCachePessimisticCheckCommittedTxRequest _clone = new GridCachePessimisticCheckCommittedTxRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridCachePessimisticCheckCommittedTxRequest _clone = (GridCachePessimisticCheckCommittedTxRequest)_msg; - - _clone.futId = futId; - _clone.miniId = miniId; - _clone.nearXidVer = nearXidVer; - _clone.originatingNodeId = originatingNodeId; - _clone.originatingThreadId = originatingThreadId; - _clone.nearOnlyCheck = nearOnlyCheck; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 9: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - case 10: - if (!commState.putCacheVersion(nearXidVer)) - return false; - - commState.idx++; - - case 11: - if (!commState.putUuid(originatingNodeId)) - return false; - - commState.idx++; - - case 12: - if (!commState.putLong(originatingThreadId)) - return false; - - commState.idx++; - - case 13: - if (!commState.putBoolean(nearOnlyCheck)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 9: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - case 10: - GridCacheVersion nearXidVer0 = commState.getCacheVersion(); - - if (nearXidVer0 == CACHE_VER_NOT_READ) - return false; - - nearXidVer = nearXidVer0; - - commState.idx++; - - case 11: - UUID originatingNodeId0 = commState.getUuid(); - - if (originatingNodeId0 == UUID_NOT_READ) - return false; - - originatingNodeId = originatingNodeId0; - - commState.idx++; - - case 12: - if (buf.remaining() < 8) - return false; - - originatingThreadId = commState.getLong(); - - commState.idx++; - - case 13: - if (buf.remaining() < 1) - return false; - - nearOnlyCheck = commState.getBoolean(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 20; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxRequest.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java deleted file mode 100644 index 8b50645..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * Check prepared transactions response. - */ -public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Committed transaction info. */ - @GridDirectTransient - private GridCacheCommittedTxInfo<K, V> committedTxInfo; - - /** Serialized transaction info. */ - private byte[] committedTxInfoBytes; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCachePessimisticCheckCommittedTxResponse() { - // No-op. - } - - /** - * @param txId Transaction ID. - * @param futId Future ID. - * @param miniId Mini future ID. - * @param committedTxInfo Committed transaction info. - */ - public GridCachePessimisticCheckCommittedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId, - @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo) { - super(txId, 0); - - this.futId = futId; - this.miniId = miniId; - this.committedTxInfo = committedTxInfo; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return {@code True} if all remote transactions were prepared. - */ - public GridCacheCommittedTxInfo<K, V> committedTxInfo() { - return committedTxInfo; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (committedTxInfo != null) { - marshalTx(committedTxInfo.recoveryWrites(), ctx); - - committedTxInfoBytes = ctx.marshaller().marshal(committedTxInfo); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (committedTxInfoBytes != null) { - committedTxInfo = ctx.marshaller().unmarshal(committedTxInfoBytes, ldr); - - unmarshalTx(committedTxInfo.recoveryWrites(), false, ctx, ldr); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridCachePessimisticCheckCommittedTxResponse _clone = new GridCachePessimisticCheckCommittedTxResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridCachePessimisticCheckCommittedTxResponse _clone = (GridCachePessimisticCheckCommittedTxResponse)_msg; - - _clone.futId = futId; - _clone.miniId = miniId; - _clone.committedTxInfo = committedTxInfo; - _clone.committedTxInfoBytes = committedTxInfoBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putByteArray(committedTxInfoBytes)) - return false; - - commState.idx++; - - case 9: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 10: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - byte[] committedTxInfoBytes0 = commState.getByteArray(); - - if (committedTxInfoBytes0 == BYTE_ARR_NOT_READ) - return false; - - committedTxInfoBytes = committedTxInfoBytes0; - - commState.idx++; - - case 9: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 10: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 21; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxResponse.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java deleted file mode 100644 index 26a78f5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxCommitBuffer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Buffer that stores transaction commit values in order to restore them in case of originating node crash. - */ -public interface GridCacheTxCommitBuffer<K, V> { - /** - * Adds committed transaction to commit buffer. - * - * @param tx Committed transaction. - */ - public void addCommittedTx(IgniteTxEx<K, V> tx); - - /** - * Gets transaction from commit buffer. - * - * @param originatingTxVer Originating tx version. - * @param nodeId Originating node ID. - * @param threadId Originating thread ID. - * @return Committed info, if any. - */ - @Nullable public GridCacheCommittedTxInfo<K, V> committedTx(GridCacheVersion originatingTxVer, UUID nodeId, - long threadId); - - /** - * Callback called when lode left grid. Used to eventually cleanup the queue from committed tx info from - * left node. - * - * @param nodeId Left node ID. - */ - public void onNodeLeft(UUID nodeId); - - /** - * @return Buffer size. - */ - public int size(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 4356dc4..da6ca72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -73,14 +73,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage @GridDirectTransient private List<K> keys; - /** Write entries. */ - @GridToStringInclude - @GridDirectTransient - private List<IgniteTxEntry<K, V>> writeEntries; - - /** Serialized write entries. */ - private byte[] writeEntriesBytes; - /** Array indicating whether value should be returned for a key. */ @GridToStringInclude private boolean[] retVals; @@ -102,10 +94,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage /** Partition lock flag. Only if group-lock transaction. */ private boolean partLock; - /** DR versions. */ - @GridToStringInclude - private GridCacheVersion[] drVersByIdx; - /** * Empty constructor. */ @@ -252,13 +240,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage } /** - * @return Write entries list. - */ - public List<IgniteTxEntry<K, V>> writeEntries() { - return writeEntries; - } - - /** * @return Tx size. */ public int txSize() { @@ -271,19 +252,15 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage * @param key Key. * @param retVal Flag indicating whether value should be returned. * @param keyBytes Key bytes. - * @param writeEntry Write entry. * @param cands Candidates. - * @param drVer DR version. * @param ctx Context. * @throws IgniteCheckedException If failed. */ public void addKeyBytes( K key, @Nullable byte[] keyBytes, - @Nullable IgniteTxEntry<K, V> writeEntry, boolean retVal, @Nullable Collection<GridCacheMvccCandidate<K>> cands, - @Nullable GridCacheVersion drVer, GridCacheContext<K, V> ctx ) throws IgniteCheckedException { if (ctx.deploymentEnabled()) @@ -302,21 +279,9 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage keys.add(key); candidatesByIndex(idx, cands); - drVersionByIndex(idx, drVer); retVals[idx] = retVal; - if (writeEntry != null) { - if (writeEntries == null) { - assert idx == 0 : "Cannot start adding write entries in the middle of lock message [idx=" + idx + - ", writeEntry=" + writeEntry + ']'; - - writeEntries = new ArrayList<>(keysCount()); - } - - writeEntries.add(writeEntry); - } - idx++; } @@ -355,39 +320,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage return timeout; } - /** - * @param idx Key index. - * @param drVer DR version. - */ - @SuppressWarnings({"unchecked"}) - public void drVersionByIndex(int idx, GridCacheVersion drVer) { - assert idx < keysCount(); - - // If nothing to add. - if (drVer == null) - return; - - if (drVersByIdx == null) - drVersByIdx = new GridCacheVersion[keysCount()]; - - drVersByIdx[idx] = drVer; - } - - /** - * @param idx Key index. - * @return DR versions for given key. - */ - public GridCacheVersion drVersionByIndex(int idx) { - return drVersByIdx == null ? null : drVersByIdx[idx]; - } - - /** - * @return All DR versions. - */ - public GridCacheVersion[] drVersions() { - return drVersByIdx; - } - /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { @@ -399,12 +331,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage grpLockKeyBytes = CU.marshal(ctx, grpLockKey); } - - if (writeEntries != null) { - marshalTx(writeEntries, ctx); - - writeEntriesBytes = ctx.marshaller().marshal(writeEntries); - } } /** {@inheritDoc} */ @@ -416,12 +342,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage if (grpLockKey == null && grpLockKeyBytes != null) grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); - - if (writeEntriesBytes != null) { - writeEntries = ctx.marshaller().unmarshal(writeEntriesBytes, ldr); - - unmarshalTx(writeEntries, false, ctx, ldr); - } } /** {@inheritDoc} */ @@ -452,15 +372,12 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage _clone.isolation = isolation; _clone.keyBytes = keyBytes; _clone.keys = keys; - _clone.writeEntries = writeEntries; - _clone.writeEntriesBytes = writeEntriesBytes; _clone.retVals = retVals; _clone.idx = idx; _clone.txSize = txSize; _clone.grpLockKey = grpLockKey; _clone.grpLockKeyBytes = grpLockKeyBytes; _clone.partLock = partLock; - _clone.drVersByIdx = drVersByIdx; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 3792536..389cf30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -24,7 +24,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -63,24 +62,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes /** Min version used as base for completed versions. */ private GridCacheVersion baseVer; - /** Transaction write entries. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> writeEntries; - - /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> writeEntriesBytes; - - /** Write entries which have not been transferred to nodes during lock request. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> recoveryWrites; - - /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> recoveryWritesBytes; - /** Expected txSize. */ private int txSize; @@ -113,9 +94,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. * @param txSize Expected transaction size. - * @param writeEntries Write entries. - * @param recoveryWrites Recover entries. In pessimistic mode entries which were not transferred to remote nodes - * with lock requests. {@code Null} for optimistic mode. * @param grpLockKey Group lock key if this is a group-lock transaction. */ public GridDistributedTxFinishRequest( @@ -132,11 +110,9 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, int txSize, - Collection<IgniteTxEntry<K, V>> writeEntries, - Collection<IgniteTxEntry<K, V>> recoveryWrites, @Nullable IgniteTxKey grpLockKey ) { - super(xidVer, writeEntries == null ? 0 : writeEntries.size()); + super(xidVer, 0); assert xidVer != null; this.futId = futId; @@ -149,36 +125,12 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes this.syncRollback = syncRollback; this.baseVer = baseVer; this.txSize = txSize; - this.writeEntries = writeEntries; - this.recoveryWrites = recoveryWrites; this.grpLockKey = grpLockKey; completedVersions(committedVers, rolledbackVers); } /** - * Clones write entries so that near entries are not passed to DHT cache. - */ - public void cloneEntries() { - if (F.isEmpty(writeEntries)) - return; - - Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(writeEntries.size()); - - for (IgniteTxEntry<K, V> e : writeEntries) { - GridCacheContext<K, V> cacheCtx = e.context(); - - // Clone only if it is a near cache. - if (cacheCtx.isNear()) - cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context())); - else - cp.add(e); - } - - writeEntries = cp; - } - - /** * @return System flag. */ public boolean system() { @@ -290,24 +242,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (writeEntries != null) { - marshalTx(writeEntries, ctx); - - writeEntriesBytes = new ArrayList<>(writeEntries.size()); - - for (IgniteTxEntry<K, V> e : writeEntries) - writeEntriesBytes.add(ctx.marshaller().marshal(e)); - } - - if (recoveryWrites != null) { - marshalTx(recoveryWrites, ctx); - - recoveryWritesBytes = new ArrayList<>(recoveryWrites.size()); - - for (IgniteTxEntry<K, V> e : recoveryWrites) - recoveryWritesBytes.add(ctx.marshaller().marshal(e)); - } - if (grpLockKey != null && grpLockKeyBytes == null) { if (ctx.deploymentEnabled()) prepareObject(grpLockKey, ctx); @@ -320,24 +254,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (writeEntriesBytes != null) { - writeEntries = new ArrayList<>(writeEntriesBytes.size()); - - for (byte[] arr : writeEntriesBytes) - writeEntries.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(writeEntries, false, ctx, ldr); - } - - if (recoveryWritesBytes != null) { - recoveryWrites = new ArrayList<>(recoveryWritesBytes.size()); - - for (byte[] arr : recoveryWritesBytes) - recoveryWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(recoveryWrites, false, ctx, ldr); - } - if (grpLockKeyBytes != null && grpLockKey == null) grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); } @@ -365,10 +281,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes _clone.invalidate = invalidate; _clone.commit = commit; _clone.baseVer = baseVer; - _clone.writeEntries = writeEntries; - _clone.writeEntriesBytes = writeEntriesBytes; - _clone.recoveryWrites = recoveryWrites; - _clone.recoveryWritesBytes = recoveryWritesBytes; _clone.txSize = txSize; _clone.grpLockKey = grpLockKey; _clone.grpLockKeyBytes = grpLockKeyBytes; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index f0655b9..5fc3607 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -53,7 +53,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe /** Commit version for EC transactions. */ @GridToStringInclude - private GridCacheVersion commitVer; + private GridCacheVersion writeVer; /** Transaction timeout. */ @GridToStringInclude @@ -112,6 +112,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe /** */ private byte[] txNodesBytes; + /** One phase commit flag. */ + private boolean onePhaseCommit; + /** System flag. */ private boolean sys; @@ -129,6 +132,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe * @param grpLockKey Group lock key. * @param partLock {@code True} if preparing group-lock transaction with partition lock. * @param txNodes Transaction nodes mapping. + * @param onePhaseCommit One phase commit flag. */ public GridDistributedTxPrepareRequest( IgniteTxEx<K, V> tx, @@ -136,11 +140,12 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe Collection<IgniteTxEntry<K, V>> writes, IgniteTxKey grpLockKey, boolean partLock, - Map<UUID, Collection<UUID>> txNodes + Map<UUID, Collection<UUID>> txNodes, + boolean onePhaseCommit ) { super(tx.xidVersion(), 0); - commitVer = null; + writeVer = tx.writeVersion(); threadId = tx.threadId(); concurrency = tx.concurrency(); isolation = tx.isolation(); @@ -154,6 +159,29 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe this.grpLockKey = grpLockKey; this.partLock = partLock; this.txNodes = txNodes; + this.onePhaseCommit = onePhaseCommit; + } + + /** + * Clones write entries so that near entries are not passed to DHT cache. + */ + public void cloneEntries() { + if (F.isEmpty(writes)) + return; + + Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(writes.size()); + + for (IgniteTxEntry<K, V> e : writes) { + GridCacheContext<K, V> cacheCtx = e.context(); + + // Clone only if it is a near cache. + if (cacheCtx.isNear()) + cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context())); + else + cp.add(e); + } + + writes = cp; } /** @@ -200,7 +228,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe /** * @return Commit version. */ - public GridCacheVersion commitVersion() { return commitVer; } + public GridCacheVersion writeVersion() { return writeVer; } /** * @return Invalidate flag. @@ -277,6 +305,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe return txSize; } + /** + * @return One phase commit flag. + */ + public boolean onePhaseCommit() { + return onePhaseCommit; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { @@ -420,7 +455,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe _clone.threadId = threadId; _clone.concurrency = concurrency; _clone.isolation = isolation; - _clone.commitVer = commitVer; + _clone.writeVer = writeVer; _clone.timeout = timeout; _clone.invalidate = invalidate; _clone.reads = reads; @@ -455,7 +490,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe switch (commState.idx) { case 8: - if (!commState.putCacheVersion(commitVer)) + if (!commState.putCacheVersion(writeVer)) return false; commState.idx++; @@ -599,7 +634,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe if (commitVer0 == CACHE_VER_NOT_READ) return false; - commitVer = commitVer0; + writeVer = commitVer0; commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 9d07c52..a3ea0502 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -503,34 +503,33 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheVersion explicitVer = txEntry.drVersion(); - if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) { - // Primary node has left the grid so we have to process conflicts on backups. - if (explicitVer == null) - explicitVer = writeVersion(); // Force write version to be used. - - GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, - txEntry, - explicitVer, - op, - val, - valBytes, - txEntry.ttl(), - txEntry.drExpireTime()); - - if (drRes != null) { - op = drRes.operation(); - val = drRes.value(); - valBytes = drRes.valueBytes(); - - if (drRes.isMerge()) - explicitVer = writeVersion(); - else if (op == NOOP) - txEntry.ttl(-1L); - } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; + + // Primary node has left the grid so we have to process conflicts on backups. + if (explicitVer == null) + explicitVer = writeVersion(); // Force write version to be used. + + GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, + txEntry, + explicitVer, + op, + val, + valBytes, + txEntry.ttl(), + txEntry.drExpireTime()); + + if (drRes != null) { + op = drRes.operation(); + val = drRes.value(); + valBytes = drRes.valueBytes(); + + if (drRes.isMerge()) + explicitVer = writeVersion(); + else if (op == NOOP) + txEntry.ttl(-1L); } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; if (op == CREATE || op == UPDATE) { // Invalidate only for near nodes (backups cannot be invalidated). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 74d9163..adba497 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -779,8 +779,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } if (tx != null) { - tx.addDhtMapping(dhtMap); - tx.addNearMapping(nearMap); + tx.addDhtNodeEntryMapping(dhtMap); + tx.addNearNodeEntryMapping(nearMap); tx.needsCompletedVersions(hasRmtNodes); } @@ -851,6 +851,29 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo // Must unswap entry so that isNewLocked returns correct value. e.unswap(true, false); + boolean needVal = false; + + try { + needVal = e.isNewLocked(); + + if (needVal) { + List<ClusterNode> owners = cctx.topology().owners(e.partition(), + tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); + + // Do not preload if local node is partition owner. + if (owners.contains(cctx.localNode())) + needVal = false; + } + } + catch (GridCacheEntryRemovedException ex) { + assert false : "Entry cannot become obsolete when DHT local candidate is added " + + "[e=" + e + ", ex=" + ex + ']'; + } + + // Skip entry if it is not new and is not present in updated mapping. + if (tx != null && !needVal) + continue; + boolean invalidateRdr = e.readerId(n.id()) != null; IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null; @@ -858,20 +881,12 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo req.addDhtKey( e.key(), e.getOrMarshalKeyBytes(), - tx != null ? tx.writeMap().get(e.txKey()) : null, - entry != null ? entry.drVersion() : null, invalidateRdr, cctx); - try { - if (e.isNewLocked()) - // Mark last added key as needed to be preloaded. - req.markLastKeyForPreload(); - } - catch (GridCacheEntryRemovedException ex) { - assert false : "Entry cannot become obsolete when DHT local candidate is added " + - "[e=" + e + ", ex=" + ex + ']'; - } + if (needVal) + // Mark last added key as needed to be preloaded. + req.markLastKeyForPreload(); it.set(addOwned(req, e)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index a4efd13..050748a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -237,8 +237,6 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { * * @param key Key. * @param keyBytes Key bytes. - * @param writeEntry Write entry. - * @param drVer DR version. * @param invalidateEntry Flag indicating whether node should attempt to invalidate reader. * @param ctx Context. * @throws IgniteCheckedException If failed. @@ -246,14 +244,12 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { public void addDhtKey( K key, byte[] keyBytes, - IgniteTxEntry<K, V> writeEntry, - @Nullable GridCacheVersion drVer, boolean invalidateEntry, GridCacheContext<K, V> ctx ) throws IgniteCheckedException { invalidateEntries.set(idx, invalidateEntry); - addKeyBytes(key, keyBytes, writeEntry, false, null, drVer, ctx); + addKeyBytes(key, keyBytes, false, null, ctx); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index f6e69eb..4e06bc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -136,8 +136,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach GridDhtLockResponse<K, V> res) throws IgniteCheckedException, GridDistributedLockCancelledException { List<K> keys = req.keys(); - List<IgniteTxEntry<K, V>> writes = req.writeEntries(); - GridDhtTxRemote<K, V> tx = null; int size = F.size(keys); @@ -150,12 +148,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach IgniteTxKey<K> txKey = ctx.txKey(key); - IgniteTxEntry<K, V> writeEntry = writes == null ? null : writes.get(i); - assert F.isEmpty(req.candidatesByIndex(i)); - GridCacheVersion drVer = req.drVersionByIndex(i); - if (log.isDebugEnabled()) log.debug("Unmarshalled key: " + key); @@ -218,13 +212,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx.addWrite( ctx, - writeEntry == null ? NOOP : writeEntry.op(), + NOOP, txKey, req.keyBytes() != null ? req.keyBytes().get(i) : null, - writeEntry == null ? null : writeEntry.value(), - writeEntry == null ? null : writeEntry.valueBytes(), - writeEntry == null ? null : writeEntry.entryProcessors(), - drVer, + null, + null, + null, req.accessTtl()); if (req.groupLock()) @@ -828,14 +821,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); - assert req.writeEntries() == null || req.writeEntries().size() == entries.size(); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync( cacheCtx, entries, - req.writeEntries(), req.onePhaseCommit(), - req.drVersions(), req.messageId(), req.implicitTx(), req.txRead(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index ea68904..fdf6e87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -92,8 +92,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter<K, V> tx, boolean commit) { super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx)); - assert cctx != null; - this.cctx = cctx; this.tx = tx; this.commit = commit; @@ -283,6 +281,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ private boolean finish(Map<UUID, GridDistributedTxMapping<K, V>> dhtMap, Map<UUID, GridDistributedTxMapping<K, V>> nearMap) { + if (tx.onePhaseCommit()) + return false; + boolean res = false; boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); @@ -323,10 +324,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.rolledbackVersions(), tx.pendingVersions(), tx.size(), - tx.pessimistic() ? dhtMapping.writes() : null, - tx.pessimistic() && nearMapping != null ? nearMapping.writes() : null, - tx.recoveryWrites(), - tx.onePhaseCommit(), tx.groupLockKey(), tx.subjectId(), tx.taskNameHash()); @@ -345,9 +342,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } } - if (tx.onePhaseCommit()) - req.writeVersion(tx.writeVersion()); - try { cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); @@ -395,10 +389,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.rolledbackVersions(), tx.pendingVersions(), tx.size(), - null, - tx.pessimistic() ? nearMapping.writes() : null, - tx.recoveryWrites(), - tx.onePhaseCommit(), tx.groupLockKey(), tx.subjectId(), tx.taskNameHash()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index da02065..1d311d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -47,15 +47,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest /** Transaction isolation. */ private IgniteTxIsolation isolation; - /** Near writes. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> nearWrites; - - /** Serialized near writes. */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> nearWritesBytes; - /** Mini future ID. */ private IgniteUuid miniId; @@ -70,9 +61,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> pendingVers; - /** One phase commit flag for fast-commit path. */ - private boolean onePhaseCommit; - /** One phase commit write version. */ private GridCacheVersion writeVer; @@ -117,10 +105,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest * @param rolledbackVers Rolled back versions. * @param pendingVers Pending versions. * @param txSize Expected transaction size. - * @param writes Write entries. - * @param nearWrites Near cache writes. - * @param recoverWrites Recovery write entries. - * @param onePhaseCommit One phase commit flag. * @param grpLockKey Group lock key. * @param subjId Subject ID. * @param taskNameHash Task name hash. @@ -145,16 +129,12 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers, int txSize, - Collection<IgniteTxEntry<K, V>> writes, - Collection<IgniteTxEntry<K, V>> nearWrites, - Collection<IgniteTxEntry<K, V>> recoverWrites, - boolean onePhaseCommit, @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, - committedVers, rolledbackVers, txSize, writes, recoverWrites, grpLockKey); + committedVers, rolledbackVers, txSize, grpLockKey); assert miniId != null; assert nearNodeId != null; @@ -164,10 +144,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest this.topVer = topVer; this.nearNodeId = nearNodeId; this.isolation = isolation; - this.nearWrites = nearWrites; this.miniId = miniId; this.sysInvalidate = sysInvalidate; - this.onePhaseCommit = onePhaseCommit; this.subjId = subjId; this.taskNameHash = taskNameHash; } @@ -178,13 +156,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest } /** - * @return Near writes. - */ - public Collection<IgniteTxEntry<K, V>> nearWrites() { - return nearWrites == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : nearWrites; - } - - /** * @return Mini ID. */ public IgniteUuid miniId() { @@ -227,13 +198,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest } /** - * @return One phase commit flag. - */ - public boolean onePhaseCommit() { - return onePhaseCommit; - } - - /** * @return Write version for one-phase commit transactions. */ public GridCacheVersion writeVersion() { @@ -314,35 +278,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest return nearTtls; } - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (nearWrites != null) { - marshalTx(nearWrites, ctx); - - nearWritesBytes = new ArrayList<>(nearWrites.size()); - - for (IgniteTxEntry<K, V> e : nearWrites) - nearWritesBytes.add(ctx.marshaller().marshal(e)); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (nearWritesBytes != null) { - nearWrites = new ArrayList<>(nearWritesBytes.size()); - - for (byte[] arr : nearWritesBytes) - nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(nearWrites, true, ctx, ldr); - } - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); @@ -366,13 +301,10 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest _clone.nearNodeId = nearNodeId; _clone.isolation = isolation; - _clone.nearWrites = nearWrites; - _clone.nearWritesBytes = nearWritesBytes; _clone.miniId = miniId; _clone.sysInvalidate = sysInvalidate; _clone.topVer = topVer; _clone.pendingVers = pendingVers; - _clone.onePhaseCommit = onePhaseCommit; _clone.writeVer = writeVer; _clone.subjId = subjId; _clone.taskNameHash = taskNameHash;