# ignite-157-2 renamings
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f95fb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f95fb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f95fb8 Branch: refs/heads/ignite-373 Commit: f5f95fb8c952996f4479852b1ca2e086d3d57621 Parents: b141abf Author: sboikov <sboi...@gridgain.com> Authored: Wed May 6 09:56:30 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed May 6 09:56:30 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 4 +- .../communication/GridIoMessageFactory.java | 4 +- ...ridCacheOptimisticCheckPreparedTxFuture.java | 508 ------------------- ...idCacheOptimisticCheckPreparedTxRequest.java | 261 ---------- ...dCacheOptimisticCheckPreparedTxResponse.java | 179 ------- .../distributed/GridCacheTxRecoveryFuture.java | 506 ++++++++++++++++++ .../distributed/GridCacheTxRecoveryRequest.java | 261 ++++++++++ .../GridCacheTxRecoveryResponse.java | 182 +++++++ .../cache/transactions/IgniteTxHandler.java | 30 +- .../cache/transactions/IgniteTxManager.java | 98 +--- .../resources/META-INF/classnames.properties | 6 +- 11 files changed, 976 insertions(+), 1063 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index e37b4f3..0540148 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -165,8 +165,8 @@ public class MessageCodeGenerator { // gen.generateAndWrite(GridDhtTxFinishRequest.class); // gen.generateAndWrite(GridDhtTxFinishResponse.class); // -// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class); -// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class); +// gen.generateAndWrite(GridCacheTxRecoveryRequest.class); +// gen.generateAndWrite(GridCacheTxRecoveryResponse.class); // gen.generateAndWrite(GridQueryCancelRequest.class); // gen.generateAndWrite(GridQueryFailResponse.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index a395747..7fe8da8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -166,12 +166,12 @@ public class GridIoMessageFactory implements MessageFactory { break; case 16: - msg = new GridCacheOptimisticCheckPreparedTxRequest(); + msg = new GridCacheTxRecoveryRequest(); break; case 17: - msg = new GridCacheOptimisticCheckPreparedTxResponse(); + msg = new GridCacheTxRecoveryResponse(); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java deleted file mode 100644 index bd3e1cc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Future verifying that all remote transactions related to some - * optimistic transaction were prepared. - */ -public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompoundIdentityFuture<Boolean> - implements GridCacheFuture<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - private static IgniteLogger log; - - /** Trackable flag. */ - private boolean trackable = true; - - /** Context. */ - private final GridCacheSharedContext<K, V> cctx; - - /** Future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Transaction. */ - private final IgniteInternalTx tx; - - /** All involved nodes. */ - private final Map<UUID, ClusterNode> nodes; - - /** ID of failed node started transaction. */ - private final UUID failedNodeId; - - /** Transaction nodes mapping. */ - private final Map<UUID, Collection<UUID>> txNodes; - - /** */ - private final boolean nearTxCheck; - - /** - * @param cctx Context. - * @param tx Transaction. - * @param failedNodeId ID of failed node started transaction. - * @param txNodes Transaction mapping. - */ - @SuppressWarnings("ConstantConditions") - public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, - IgniteInternalTx tx, - UUID failedNodeId, - Map<UUID, Collection<UUID>> txNodes) - { - super(cctx.kernalContext(), CU.boolReducer()); - - this.cctx = cctx; - this.tx = tx; - this.txNodes = txNodes; - this.failedNodeId = failedNodeId; - - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridCacheOptimisticCheckPreparedTxFuture.class); - - nodes = new GridLeanMap<>(); - - UUID locNodeId = cctx.localNodeId(); - - for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) { - if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) { - ClusterNode node = cctx.discovery().node(e.getKey()); - - if (node != null) - nodes.put(node.id(), node); - else if (log.isDebugEnabled()) - log.debug("Transaction node left (will ignore) " + e.getKey()); - } - - for (UUID nodeId : e.getValue()) { - if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) { - ClusterNode node = cctx.discovery().node(nodeId); - - if (node != null) - nodes.put(node.id(), node); - else if (log.isDebugEnabled()) - log.debug("Transaction node left (will ignore) " + e.getKey()); - } - } - } - - UUID nearNodeId = tx.eventNodeId(); - - nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId); - } - - /** - * Initializes future. - */ - @SuppressWarnings("ConstantConditions") - public void prepare() { - if (nearTxCheck) { - UUID nearNodeId = tx.eventNodeId(); - - if (cctx.localNodeId().equals(nearNodeId)) { - IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion()); - - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - try { - onDone(fut.get()); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - }); - } - else { - MiniFuture fut = new MiniFuture(tx.eventNodeId()); - - add(fut); - - GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest( - tx, - 0, - true, - futureId(), - fut.futureId()); - - try { - cctx.io().send(nearNodeId, req, tx.ioPolicy()); - } - catch (ClusterTopologyCheckedException e) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - - markInitialized(); - } - - return; - } - - // First check transactions on local node. - int locTxNum = nodeTransactions(cctx.localNodeId()); - - if (locTxNum > 1) { - IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum); - - if (fut == null || fut.isDone()) { - boolean prepared; - - try { - prepared = fut == null ? true : fut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Check prepared transaction future failed: " + e, e); - - prepared = false; - } - - if (!prepared) { - onDone(false); - - markInitialized(); - - return; - } - } - else { - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - boolean prepared; - - try { - prepared = fut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Check prepared transaction future failed: " + e, e); - - prepared = false; - } - - if (!prepared) { - onDone(false); - - markInitialized(); - } - else - proceedPrepare(); - } - }); - - return; - } - } - - proceedPrepare(); - } - - /** - * Process prepare after local check. - */ - private void proceedPrepare() { - for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) { - UUID nodeId = entry.getKey(); - - // Skip left nodes and local node. - if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId())) - continue; - - /* - * If primary node failed then send message to all backups, otherwise - * send message only to primary node. - */ - - if (nodeId.equals(failedNodeId)) { - for (UUID id : entry.getValue()) { - // Skip backup node if it is local node or if it is also was mapped as primary. - if (txNodes.containsKey(id) || id.equals(cctx.localNodeId())) - continue; - - MiniFuture fut = new MiniFuture(id); - - add(fut); - - GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx, - nodeTransactions(id), - false, - futureId(), - fut.futureId()); - - try { - cctx.io().send(id, req, tx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - - break; - } - } - } - else { - MiniFuture fut = new MiniFuture(nodeId); - - add(fut); - - GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest( - tx, - nodeTransactions(nodeId), - false, - futureId(), - fut.futureId()); - - try { - cctx.io().send(nodeId, req, tx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - - break; - } - } - } - - markInitialized(); - } - - /** - * @param nodeId Node ID. - * @return Number of transactions on node. - */ - private int nodeTransactions(UUID nodeId) { - int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary. - - for (Collection<UUID> backups : txNodes.values()) { - for (UUID backup : backups) { - if (backup.equals(nodeId)) { - cnt++; // +1 if node is backup. - - break; - } - } - } - - return cnt; - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) { - if (!isDone()) { - for (IgniteInternalFuture<Boolean> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.nodeId().equals(nodeId); - - f.onResult(res); - - break; - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return nodes.values(); - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteInternalFuture<?> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.nodeId().equals(nodeId)) { - f.onNodeLeft(); - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - cctx.mvcc().removeFuture(this); - - if (err == null) { - assert res != null; - - cctx.tm().finishOptimisticTxOnRecovery(tx, res); - } - else { - if (err instanceof ClusterTopologyCheckedException && nearTxCheck) { - if (log.isDebugEnabled()) - log.debug("Failed to check transaction on near node, " + - "ignoring [err=" + err + ", tx=" + tx + ']'); - } - else { - if (log.isDebugEnabled()) - log.debug("Failed to check prepared transactions, " + - "invalidating transaction [err=" + err + ", tx=" + tx + ']'); - - cctx.tm().salvageTx(tx); - } - } - } - - return false; - } - - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteInternalFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheOptimisticCheckPreparedTxFuture.class, this, "super", super.toString()); - } - - /** - * - */ - private class MiniFuture extends GridFutureAdapter<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** Mini future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Node ID. */ - private UUID nodeId; - - /** - * @param nodeId Node ID. - */ - private MiniFuture(UUID nodeId) { - this.nodeId = nodeId; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return nodeId; - } - - /** - * @return Future ID. - */ - private IgniteUuid futureId() { - return futId; - } - - /** - * @param e Error. - */ - private void onError(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - onDone(e); - } - - /** - */ - private void onNodeLeft() { - if (log.isDebugEnabled()) - log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); - - if (nearTxCheck) { - // Near and originating nodes left, need initiate tx check. - cctx.tm().commitIfPrepared(tx); - - onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore).")); - } - else - onDone(true); - } - - /** - * @param res Result callback. - */ - private void onResult(GridCacheOptimisticCheckPreparedTxResponse res) { - onDone(res.success()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java deleted file mode 100644 index 4f2a1d6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.io.*; -import java.nio.*; - -/** - * Message sent to check that transactions related to transaction were prepared on remote node. - */ -public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Near transaction ID. */ - private GridCacheVersion nearXidVer; - - /** Expected number of transactions on node. */ - private int txNum; - - /** System transaction flag. */ - private boolean sys; - - /** {@code True} if should check only tx on near node. */ - private boolean nearTxCheck; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCacheOptimisticCheckPreparedTxRequest() { - // No-op. - } - - /** - * @param tx Transaction. - * @param txNum Expected number of transactions on remote node. - * @param nearTxCheck - * @param futId Future ID. - * @param miniId Mini future ID. - */ - public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, - int txNum, - boolean nearTxCheck, - IgniteUuid futId, - IgniteUuid miniId) - { - super(tx.xidVersion(), 0); - - nearXidVer = tx.nearXidVersion(); - sys = tx.system(); - - this.futId = futId; - this.miniId = miniId; - this.txNum = txNum; - this.nearTxCheck = nearTxCheck; - } - - /** - * @return {@code True} if should check only tx on near node. - */ - public boolean nearTxCheck() { - return nearTxCheck; - } - - /** - * @return Near version. - */ - public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return Expected number of transactions on node. - */ - public int transactions() { - return txNum; - } - - /** - * @return System transaction flag. - */ - public boolean system() { - return sys; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 8: - if (!writer.writeIgniteUuid("futId", futId)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeIgniteUuid("miniId", miniId)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeBoolean("nearTxCheck", nearTxCheck)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeMessage("nearXidVer", nearXidVer)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeBoolean("sys", sys)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeInt("txNum", txNum)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 8: - futId = reader.readIgniteUuid("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - miniId = reader.readIgniteUuid("miniId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - nearTxCheck = reader.readBoolean("nearTxCheck"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - nearXidVer = reader.readMessage("nearXidVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - sys = reader.readBoolean("sys"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - txNum = reader.readInt("txNum"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 16; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 14; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java deleted file mode 100644 index bc8c2e0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.io.*; -import java.nio.*; - -/** - * Check prepared transactions response. - */ -public class GridCacheOptimisticCheckPreparedTxResponse extends GridDistributedBaseMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Flag indicating if all remote transactions were prepared. */ - private boolean success; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCacheOptimisticCheckPreparedTxResponse() { - // No-op. - } - - /** - * @param txId Transaction ID. - * @param futId Future ID. - * @param miniId Mini future ID. - * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise. - */ - public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId, - boolean success) { - super(txId, 0); - - this.futId = futId; - this.miniId = miniId; - this.success = success; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return {@code True} if all remote transactions were prepared. - */ - public boolean success() { - return success; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 8: - if (!writer.writeIgniteUuid("futId", futId)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeIgniteUuid("miniId", miniId)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeBoolean("success", success)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 8: - futId = reader.readIgniteUuid("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - miniId = reader.readIgniteUuid("miniId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - success = reader.readBoolean("success"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 17; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 11; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java new file mode 100644 index 0000000..663ed90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Future verifying that all remote transactions related to transaction were prepared or committed. + */ +public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static IgniteLogger log; + + /** Trackable flag. */ + private boolean trackable = true; + + /** Context. */ + private final GridCacheSharedContext<?, ?> cctx; + + /** Future ID. */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Transaction. */ + private final IgniteInternalTx tx; + + /** All involved nodes. */ + private final Map<UUID, ClusterNode> nodes; + + /** ID of failed node started transaction. */ + private final UUID failedNodeId; + + /** Transaction nodes mapping. */ + private final Map<UUID, Collection<UUID>> txNodes; + + /** */ + private final boolean nearTxCheck; + + /** + * @param cctx Context. + * @param tx Transaction. + * @param failedNodeId ID of failed node started transaction. + * @param txNodes Transaction mapping. + */ + @SuppressWarnings("ConstantConditions") + public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx, + IgniteInternalTx tx, + UUID failedNodeId, + Map<UUID, Collection<UUID>> txNodes) + { + super(cctx.kernalContext(), CU.boolReducer()); + + this.cctx = cctx; + this.tx = tx; + this.txNodes = txNodes; + this.failedNodeId = failedNodeId; + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class); + + nodes = new GridLeanMap<>(); + + UUID locNodeId = cctx.localNodeId(); + + for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) { + if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) { + ClusterNode node = cctx.discovery().node(e.getKey()); + + if (node != null) + nodes.put(node.id(), node); + else if (log.isDebugEnabled()) + log.debug("Transaction node left (will ignore) " + e.getKey()); + } + + for (UUID nodeId : e.getValue()) { + if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) { + ClusterNode node = cctx.discovery().node(nodeId); + + if (node != null) + nodes.put(node.id(), node); + else if (log.isDebugEnabled()) + log.debug("Transaction node left (will ignore) " + e.getKey()); + } + } + } + + UUID nearNodeId = tx.eventNodeId(); + + nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId); + } + + /** + * Initializes future. + */ + @SuppressWarnings("ConstantConditions") + public void prepare() { + if (nearTxCheck) { + UUID nearNodeId = tx.eventNodeId(); + + if (cctx.localNodeId().equals(nearNodeId)) { + IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion()); + + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else { + MiniFuture fut = new MiniFuture(tx.eventNodeId()); + + add(fut); + + GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest( + tx, + 0, + true, + futureId(), + fut.futureId()); + + try { + cctx.io().send(nearNodeId, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + + markInitialized(); + } + + return; + } + + // First check transactions on local node. + int locTxNum = nodeTransactions(cctx.localNodeId()); + + if (locTxNum > 1) { + IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum); + + if (fut == null || fut.isDone()) { + boolean prepared; + + try { + prepared = fut == null ? true : fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed: " + e, e); + + prepared = false; + } + + if (!prepared) { + onDone(false); + + markInitialized(); + + return; + } + } + else { + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + boolean prepared; + + try { + prepared = fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed: " + e, e); + + prepared = false; + } + + if (!prepared) { + onDone(false); + + markInitialized(); + } + else + proceedPrepare(); + } + }); + + return; + } + } + + proceedPrepare(); + } + + /** + * Process prepare after local check. + */ + private void proceedPrepare() { + for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) { + UUID nodeId = entry.getKey(); + + // Skip left nodes and local node. + if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId())) + continue; + + /* + * If primary node failed then send message to all backups, otherwise + * send message only to primary node. + */ + + if (nodeId.equals(failedNodeId)) { + for (UUID id : entry.getValue()) { + // Skip backup node if it is local node or if it is also was mapped as primary. + if (txNodes.containsKey(id) || id.equals(cctx.localNodeId())) + continue; + + MiniFuture fut = new MiniFuture(id); + + add(fut); + + GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx, + nodeTransactions(id), + false, + futureId(), + fut.futureId()); + + try { + cctx.io().send(id, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + fut.onError(e); + + break; + } + } + } + else { + MiniFuture fut = new MiniFuture(nodeId); + + add(fut); + + GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest( + tx, + nodeTransactions(nodeId), + false, + futureId(), + fut.futureId()); + + try { + cctx.io().send(nodeId, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + fut.onError(e); + + break; + } + } + } + + markInitialized(); + } + + /** + * @param nodeId Node ID. + * @return Number of transactions on node. + */ + private int nodeTransactions(UUID nodeId) { + int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary. + + for (Collection<UUID> backups : txNodes.values()) { + for (UUID backup : backups) { + if (backup.equals(nodeId)) { + cnt++; // +1 if node is backup. + + break; + } + } + } + + return cnt; + } + + /** + * @param nodeId Node ID. + * @param res Response. + */ + public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) { + if (!isDone()) { + for (IgniteInternalFuture<Boolean> fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.nodeId().equals(nodeId); + + f.onResult(res); + + break; + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return tx.xidVersion(); + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return nodes.values(); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + for (IgniteInternalFuture<?> fut : futures()) + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.nodeId().equals(nodeId)) { + f.onNodeLeft(); + + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + cctx.mvcc().removeFuture(this); + + if (err == null) { + assert res != null; + + cctx.tm().finishTxOnRecovery(tx, res); + } + else { + if (err instanceof ClusterTopologyCheckedException && nearTxCheck) { + if (log.isDebugEnabled()) + log.debug("Failed to check transaction on near node, " + + "ignoring [err=" + err + ", tx=" + tx + ']'); + } + else { + if (log.isDebugEnabled()) + log.debug("Failed to check prepared transactions, " + + "invalidating transaction [err=" + err + ", tx=" + tx + ']'); + + cctx.tm().salvageTx(tx); + } + } + } + + return false; + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** Mini future ID. */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Node ID. */ + private UUID nodeId; + + /** + * @param nodeId Node ID. + */ + private MiniFuture(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return Node ID. + */ + private UUID nodeId() { + return nodeId; + } + + /** + * @return Future ID. + */ + private IgniteUuid futureId() { + return futId; + } + + /** + * @param e Error. + */ + private void onError(Throwable e) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + onDone(e); + } + + /** + */ + private void onNodeLeft() { + if (log.isDebugEnabled()) + log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); + + if (nearTxCheck) { + // Near and originating nodes left, need initiate tx check. + cctx.tm().commitIfPrepared(tx); + + onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore).")); + } + else + onDone(true); + } + + /** + * @param res Result callback. + */ + private void onResult(GridCacheTxRecoveryResponse res) { + onDone(res.success()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java new file mode 100644 index 0000000..259c288 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; +import java.nio.*; + +/** + * Message sent to check that transactions related to transaction were prepared on remote node. + */ +public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Mini future ID. */ + private IgniteUuid miniId; + + /** Near transaction ID. */ + private GridCacheVersion nearXidVer; + + /** Expected number of transactions on node. */ + private int txNum; + + /** System transaction flag. */ + private boolean sys; + + /** {@code True} if should check only tx on near node. */ + private boolean nearTxCheck; + + /** + * Empty constructor required by {@link Externalizable} + */ + public GridCacheTxRecoveryRequest() { + // No-op. + } + + /** + * @param tx Transaction. + * @param txNum Expected number of transactions on remote node. + * @param nearTxCheck {@code True} if should check only tx on near node. + * @param futId Future ID. + * @param miniId Mini future ID. + */ + public GridCacheTxRecoveryRequest(IgniteInternalTx tx, + int txNum, + boolean nearTxCheck, + IgniteUuid futId, + IgniteUuid miniId) + { + super(tx.xidVersion(), 0); + + nearXidVer = tx.nearXidVersion(); + sys = tx.system(); + + this.futId = futId; + this.miniId = miniId; + this.txNum = txNum; + this.nearTxCheck = nearTxCheck; + } + + /** + * @return {@code True} if should check only tx on near node. + */ + public boolean nearTxCheck() { + return nearTxCheck; + } + + /** + * @return Near version. + */ + public GridCacheVersion nearXidVersion() { + return nearXidVer; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return Expected number of transactions on node. + */ + public int transactions() { + return txNum; + } + + /** + * @return System transaction flag. + */ + public boolean system() { + return sys; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 8: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeIgniteUuid("miniId", miniId)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("nearTxCheck", nearTxCheck)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("nearXidVer", nearXidVer)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeBoolean("sys", sys)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeInt("txNum", txNum)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 8: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + miniId = reader.readIgniteUuid("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + nearTxCheck = reader.readBoolean("nearTxCheck"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + nearXidVer = reader.readMessage("nearXidVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + sys = reader.readBoolean("sys"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + txNum = reader.readInt("txNum"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 16; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 14; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java new file mode 100644 index 0000000..e5c026a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; +import java.nio.*; + +/** + * Transactions recovery check response. + */ +public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Mini future ID. */ + private IgniteUuid miniId; + + /** Flag indicating if all remote transactions were prepared. */ + private boolean success; + + /** + * Empty constructor required by {@link Externalizable} + */ + public GridCacheTxRecoveryResponse() { + // No-op. + } + + /** + * @param txId Transaction ID. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise. + */ + public GridCacheTxRecoveryResponse(GridCacheVersion txId, + IgniteUuid futId, + IgniteUuid miniId, + boolean success) + { + super(txId, 0); + + this.futId = futId; + this.miniId = miniId; + this.success = success; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future ID. + */ + public IgniteUuid miniId() { + return miniId; + } + + /** + * @return {@code True} if all remote transactions were prepared. + */ + public boolean success() { + return success; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 8: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeIgniteUuid("miniId", miniId)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("success", success)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 8: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + miniId = reader.readIgniteUuid("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + success = reader.readBoolean("success"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 17; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 11; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTxRecoveryResponse.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 2897e30..af75fb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -119,16 +119,16 @@ public class IgniteTxHandler { } }); - ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class, - new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest>() { - @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) { + ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class, + new CI2<UUID, GridCacheTxRecoveryRequest>() { + @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) { processCheckPreparedTxRequest(nodeId, req); } }); - ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class, - new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse>() { - @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) { + ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class, + new CI2<UUID, GridCacheTxRecoveryResponse>() { + @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) { processCheckPreparedTxResponse(nodeId, res); } }); @@ -138,6 +138,7 @@ public class IgniteTxHandler { * @param nearNodeId Near node ID that initiated transaction. * @param locTx Optional local transaction. * @param req Near prepare request. + * @param completeCb Completion callback. * @return Future for transaction. */ public IgniteInternalFuture<IgniteInternalTx> prepareTx( @@ -170,6 +171,7 @@ public class IgniteTxHandler { * * @param locTx Local transaction. * @param req Near prepare request. + * @param completeCb Completion callback. * @return Prepare future. */ private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx( @@ -177,7 +179,6 @@ public class IgniteTxHandler { final GridNearTxPrepareRequest req, final IgniteInClosure<GridNearTxPrepareResponse> completeCb ) { - IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys. return new GridEmbeddedFuture<>( @@ -223,6 +224,7 @@ public class IgniteTxHandler { * * @param nearNodeId Near node ID that initiated transaction. * @param req Near prepare request. + * @param completeCb Completion callback. * @return Prepare future. */ private IgniteInternalFuture<IgniteInternalTx> prepareNearTx( @@ -442,6 +444,7 @@ public class IgniteTxHandler { /** * @param nodeId Node ID. + * @param locTx Local transaction. * @param req Request. * @return Future. */ @@ -1099,6 +1102,7 @@ public class IgniteTxHandler { } /** + * @param cacheCtx Context. * @param key Key * @param ver Version. * @throws IgniteCheckedException If invalidate failed. @@ -1183,7 +1187,7 @@ public class IgniteTxHandler { * @param req Request. */ protected void processCheckPreparedTxRequest(final UUID nodeId, - final GridCacheOptimisticCheckPreparedTxRequest req) + final GridCacheTxRecoveryRequest req) { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); @@ -1231,10 +1235,10 @@ public class IgniteTxHandler { * @param prepared {@code True} if all transaction prepared or committed. */ private void sendCheckPreparedResponse(UUID nodeId, - GridCacheOptimisticCheckPreparedTxRequest req, + GridCacheTxRecoveryRequest req, boolean prepared) { - GridCacheOptimisticCheckPreparedTxResponse res = - new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared); + GridCacheTxRecoveryResponse res = + new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared); try { if (log.isDebugEnabled()) @@ -1256,11 +1260,11 @@ public class IgniteTxHandler { * @param nodeId Node ID. * @param res Response. */ - protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) { + protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - GridCacheOptimisticCheckPreparedTxFuture fut = (GridCacheOptimisticCheckPreparedTxFuture)ctx.mvcc(). + GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc(). <Boolean>future(res.version(), res.futureId()); if (fut == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 85b3ad0..8a1d490 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1931,40 +1931,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Gets local transaction for pessimistic tx recovery. - * - * @param nearXidVer Near tx ID. - * @return Near local or colocated local transaction. - */ - @Nullable public IgniteInternalTx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) { - // First check if we have near transaction with this ID. - IgniteInternalTx tx = idMap.get(nearXidVer); - - if (tx == null) { - // Check all local transactions and mark them as waiting for recovery to prevent finish race. - for (IgniteInternalTx txEx : idMap.values()) { - if (nearXidVer.equals(txEx.nearXidVersion())) { - if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT)) - tx = txEx; - } - } - } - - // Either we found near transaction or one of transactions is being committed by user. - // Wait for it and send reply. - if (tx != null && tx.local()) - return tx; - - return null; - } - - /** * Commits or rolls back prepared transaction. * * @param tx Transaction. * @param commit Whether transaction should be committed or rolled back. */ - public void finishOptimisticTxOnRecovery(final IgniteInternalTx tx, boolean commit) { + public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) { if (log.isDebugEnabled()) log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']'); @@ -1989,71 +1961,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Commits or rolls back pessimistic transaction. - * - * @param tx Transaction to finish. - * @param commitInfo Commit information. - */ - public void finishPessimisticTxOnRecovery(final IgniteInternalTx tx, GridCacheCommittedTxInfo commitInfo) { - if (!tx.markFinalizing(RECOVERY_FINISH)) { - if (log.isDebugEnabled()) - log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx); - - return; - } - - if (tx instanceof GridDistributedTxRemoteAdapter) { - IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), - Collections.<GridCacheVersion>emptyList(), - Collections.<GridCacheVersion>emptyList(), - Collections.<GridCacheVersion>emptyList()); - } - - try { - tx.prepare(); - - if (commitInfo != null) { - for (IgniteTxEntry entry : commitInfo.recoveryWrites()) { - IgniteTxEntry write = tx.writeMap().get(entry.txKey()); - - if (write != null) { - GridCacheEntryEx cached = write.cached(); - - IgniteTxEntry recovered = entry.cleanCopy(write.context()); - - if (cached == null || cached.detached()) - cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion()); - - recovered.cached(cached); - - tx.writeMap().put(entry.txKey(), recovered); - - continue; - } - - // If write was not found, check read. - IgniteTxEntry read = tx.readMap().remove(entry.txKey()); - - if (read != null) - tx.writeMap().put(entry.txKey(), entry); - } - - tx.commitAsync().listen(new CommitListener(tx)); - } - else - tx.rollbackAsync(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e); - - salvageTx(tx); - } - } - - /** - * Commits optimistic transaction in case when node started transaction failed, but all related + * Commits transaction in case when node started transaction failed, but all related * transactions were prepared (invalidates transaction if it is not fully prepared). * * @param tx Transaction. @@ -2063,7 +1971,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !F.isEmpty(tx.transactionNodes()) : tx; assert tx.nearXidVersion() != null : tx; - GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>( + GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture( cctx, tx, tx.originatingNodeId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 35495ed..657f4af 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -455,9 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo -org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxFuture$1 -org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest -org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse +org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1 +org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest +org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter