http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java deleted file mode 100644 index e0759c9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePerThreadTxCommitBuffer.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.timeout.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Committed tx buffer which should be used in synchronous commit mode. - */ -public class GridCachePerThreadTxCommitBuffer<K, V> implements GridCacheTxCommitBuffer<K, V> { - /** Logger. */ - private IgniteLogger log; - - /** Cache context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Store map. */ - private Map<StoreKey, GridCacheCommittedTxInfo<K, V>> infoMap; - - /** - * @param cctx Cache context. - */ - public GridCachePerThreadTxCommitBuffer(GridCacheSharedContext<K, V> cctx) { - this.cctx = cctx; - - log = cctx.logger(GridCachePerThreadTxCommitBuffer.class); - - int logSize = cctx.txConfig().getPessimisticTxLogSize(); - - infoMap = logSize > 0 ? - new GridBoundedConcurrentLinkedHashMap<StoreKey, GridCacheCommittedTxInfo<K, V>>(logSize) : - new ConcurrentHashMap8<StoreKey, GridCacheCommittedTxInfo<K, V>>(); - } - - /** {@inheritDoc} */ - @Override public void addCommittedTx(IgniteTxEx<K, V> tx) { - long threadId = tx.threadId(); - - StoreKey key = new StoreKey(tx.eventNodeId(), threadId); - - if (log.isDebugEnabled()) - log.debug("Adding committed transaction [locNodeId=" + cctx.localNodeId() + ", key=" + key + - ", tx=" + tx + ']'); - - infoMap.put(key, new GridCacheCommittedTxInfo<>(tx)); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridCacheCommittedTxInfo<K, V> committedTx(GridCacheVersion originatingTxVer, - UUID nodeId, long threadId) { - assert originatingTxVer != null; - - StoreKey key = new StoreKey(nodeId, threadId); - - GridCacheCommittedTxInfo<K, V> txInfo = infoMap.get(key); - - if (log.isDebugEnabled()) - log.debug("Got committed transaction info by key [locNodeId=" + cctx.localNodeId() + - ", key=" + key + ", originatingTxVer=" + originatingTxVer + ", txInfo=" + txInfo + ']'); - - if (txInfo == null || !originatingTxVer.equals(txInfo.originatingTxId())) - return null; - - return txInfo; - } - - /** - * @param nodeId Left node ID. - */ - @Override public void onNodeLeft(UUID nodeId) { - // Clear all node's records after clear interval. - cctx.kernalContext().timeout().addTimeoutObject( - new NodeLeftTimeoutObject(cctx.txConfig().getPessimisticTxLogLinger(), nodeId)); - } - - /** {@inheritDoc} */ - @Override public int size() { - return infoMap.size(); - } - - /** - * Store key. - */ - private static class StoreKey { - /** Node ID which started transaction. */ - private UUID nodeId; - - /** Thread ID which started transaction. */ - private long threadId; - - /** - * @param nodeId Node ID. - * @param threadId Thread ID. - */ - private StoreKey(UUID nodeId, long threadId) { - this.nodeId = nodeId; - this.threadId = threadId; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - StoreKey storeKey = (StoreKey)o; - - return threadId == storeKey.threadId && nodeId.equals(storeKey.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int)(threadId ^ (threadId >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(StoreKey.class, this); - } - } - - /** - * Node left timeout object which will clear all committed records from left node. - */ - private class NodeLeftTimeoutObject extends GridTimeoutObjectAdapter { - /** Left node ID. */ - private UUID leftNodeId; - - /** - * @param timeout Timeout. - * @param leftNodeId Left node ID. - */ - protected NodeLeftTimeoutObject(long timeout, UUID leftNodeId) { - super(timeout); - - this.leftNodeId = leftNodeId; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - Iterator<StoreKey> it = infoMap.keySet().iterator(); - - while (it.hasNext()) { - StoreKey key = it.next(); - - if (leftNodeId.equals(key.nodeId())) - it.remove(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java deleted file mode 100644 index bd902cb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Future verifying that all remote transactions related to some - * optimistic transaction were prepared. - */ -public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompoundIdentityFuture<GridCacheCommittedTxInfo<K, V>> - implements GridCacheFuture<GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Trackable flag. */ - private boolean trackable = true; - - /** Context. */ - private final GridCacheSharedContext<K, V> cctx; - - /** Future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Transaction. */ - private final IgniteTxEx<K, V> tx; - - /** All involved nodes. */ - private final Map<UUID, ClusterNode> nodes; - - /** ID of failed node started transaction. */ - private final UUID failedNodeId; - - /** Flag indicating that future checks near node instead of checking all topology in case of primary node crash. */ - private boolean nearCheck; - - /** - * @param cctx Context. - * @param tx Transaction. - * @param failedNodeId ID of failed node started transaction. - */ - @SuppressWarnings("ConstantConditions") - public GridCachePessimisticCheckCommittedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteTxEx<K, V> tx, - UUID failedNodeId) { - super(cctx.kernalContext(), new SingleReducer<K, V>()); - - this.cctx = cctx; - this.tx = tx; - this.failedNodeId = failedNodeId; - - nodes = new GridLeanMap<>(); - - for (ClusterNode node : CU.allNodes(cctx, tx.topologyVersion())) - nodes.put(node.id(), node); - } - - /** - * Initializes future. - */ - public void prepare() { - if (log.isDebugEnabled()) - log.debug("Checking if transaction was committed on remote nodes: " + tx); - - // Check local node first (local node can be a backup node for some part of this transaction). - long originatingThreadId = tx.threadId(); - - if (tx instanceof IgniteTxRemoteEx) - originatingThreadId = ((IgniteTxRemoteEx)tx).remoteThreadId(); - - GridCacheCommittedTxInfo<K, V> txInfo = cctx.tm().txCommitted(tx.nearXidVersion(), tx.eventNodeId(), - originatingThreadId); - - if (txInfo != null) { - onDone(txInfo); - - markInitialized(); - - return; - } - - Collection<ClusterNode> checkNodes = CU.remoteNodes(cctx, tx.topologyVersion()); - - if (tx instanceof GridDhtTxRemote) { - // If we got primary node failure and near node has not failed. - if (tx.nodeId().equals(failedNodeId) && !tx.eventNodeId().equals(failedNodeId)) { - nearCheck = true; - - ClusterNode nearNode = cctx.discovery().node(tx.eventNodeId()); - - if (nearNode == null) { - // Near node failed, separate check prepared future will take care of it. - onDone(new ClusterTopologyException("Failed to check near transaction state (near node left grid): " + - tx.eventNodeId())); - - return; - } - - checkNodes = Collections.singletonList(nearNode); - } - } - - for (ClusterNode rmtNode : checkNodes) { - // Skip left nodes and local node. - if (rmtNode.id().equals(failedNodeId)) - continue; - - GridCachePessimisticCheckCommittedTxRequest<K, V> req = new GridCachePessimisticCheckCommittedTxRequest<>( - tx, - originatingThreadId, futureId(), nearCheck); - - if (rmtNode.isLocal()) - add(cctx.tm().checkPessimisticTxCommitted(req)); - else { - MiniFuture fut = new MiniFuture(rmtNode.id()); - - req.miniId(fut.futureId()); - - add(fut); - - try { - cctx.io().send(rmtNode.id(), req); - } - catch (ClusterTopologyException ignored) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - - break; - } - } - } - - markInitialized(); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - public void onResult(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - if (!isDone()) { - for (IgniteFuture<GridCacheCommittedTxInfo<K, V>> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.nodeId().equals(nodeId); - - f.onResult(res); - - break; - } - } - } - } - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return nodes.values(); - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - for (IgniteFuture<?> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.nodeId().equals(nodeId)) { - f.onNodeLeft(); - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable GridCacheCommittedTxInfo<K, V> res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - cctx.mvcc().removeFuture(this); - - if (log.isDebugEnabled()) - log.debug("Completing check committed tx future for transaction [tx=" + tx + ", res=" + res + - ", err=" + err + ']'); - - if (err == null) - cctx.tm().finishPessimisticTxOnRecovery(tx, res); - else { - if (log.isDebugEnabled()) - log.debug("Failed to check prepared transactions, " + - "invalidating transaction [err=" + err + ", tx=" + tx + ']'); - - if (nearCheck) - return true; - - cctx.tm().salvageTx(tx); - } - - return true; - } - - return false; - } - - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxFuture.class, this, "super", super.toString()); - } - - /** - * - */ - private class MiniFuture extends GridFutureAdapter<GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Mini future ID. */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Node ID. */ - private UUID nodeId; - - /** - * Empty constructor required by {@link Externalizable} - */ - public MiniFuture() { - // No-op. - } - - /** - * @param nodeId Node ID. - */ - private MiniFuture(UUID nodeId) { - super(cctx.kernalContext()); - - this.nodeId = nodeId; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return nodeId; - } - - /** - * @return Future ID. - */ - private IgniteUuid futureId() { - return futId; - } - - /** - * @param e Error. - */ - private void onError(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - onDone(e); - } - - /** - */ - private void onNodeLeft() { - if (log.isDebugEnabled()) - log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); - - if (nearCheck) { - onDone(new ClusterTopologyException("Failed to check near transaction state (near node left grid): " + - nodeId)); - - return; - } - - onDone((GridCacheCommittedTxInfo<K, V>)null); - } - - /** - * @param res Result callback. - */ - private void onResult(GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - onDone(res.committedTxInfo()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); - } - } - - /** - * Single value reducer. - */ - private static class SingleReducer<K, V> implements - IgniteReducer<GridCacheCommittedTxInfo<K, V>, GridCacheCommittedTxInfo<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private AtomicReference<GridCacheCommittedTxInfo<K, V>> collected = new AtomicReference<>(); - - /** {@inheritDoc} */ - @Override public boolean collect(@Nullable GridCacheCommittedTxInfo<K, V> info) { - if (info != null) { - collected.compareAndSet(null, info); - - // Stop collecting on first collected info. - return false; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public GridCacheCommittedTxInfo<K, V> reduce() { - return collected.get(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java deleted file mode 100644 index 835c4a2..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Message sent to check that transactions related to some pessimistic transaction - * were prepared on remote node. - */ -public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Near transaction ID. */ - private GridCacheVersion nearXidVer; - - /** Originating node ID. */ - private UUID originatingNodeId; - - /** Originating thread ID. */ - private long originatingThreadId; - - /** Flag indicating that this is near-only check. */ - @GridDirectVersion(1) - private boolean nearOnlyCheck; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCachePessimisticCheckCommittedTxRequest() { - // No-op. - } - - /** - * @param tx Transaction. - * @param originatingThreadId Originating thread ID. - * @param futId Future ID. - */ - public GridCachePessimisticCheckCommittedTxRequest(IgniteTxEx<K, V> tx, long originatingThreadId, IgniteUuid futId, - boolean nearOnlyCheck) { - super(tx.xidVersion(), 0); - - this.futId = futId; - this.nearOnlyCheck = nearOnlyCheck; - - nearXidVer = tx.nearXidVersion(); - originatingNodeId = tx.eventNodeId(); - this.originatingThreadId = originatingThreadId; - } - - /** - * @return Near version. - */ - public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** - * @return Tx originating node ID. - */ - public UUID originatingNodeId() { - return originatingNodeId; - } - - /** - * @return Tx originating thread ID. - */ - public long originatingThreadId() { - return originatingThreadId; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @param miniId Mini ID to set. - */ - public void miniId(IgniteUuid miniId) { - this.miniId = miniId; - } - - /** - * @return Flag indicating that this request was sent only to near node. If this flag is set, no finalizing - * will be executed on receiving (near) node since this is a user node. - */ - public boolean nearOnlyCheck() { - return nearOnlyCheck; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridCachePessimisticCheckCommittedTxRequest _clone = new GridCachePessimisticCheckCommittedTxRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridCachePessimisticCheckCommittedTxRequest _clone = (GridCachePessimisticCheckCommittedTxRequest)_msg; - - _clone.futId = futId; - _clone.miniId = miniId; - _clone.nearXidVer = nearXidVer; - _clone.originatingNodeId = originatingNodeId; - _clone.originatingThreadId = originatingThreadId; - _clone.nearOnlyCheck = nearOnlyCheck; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 9: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - case 10: - if (!commState.putCacheVersion(nearXidVer)) - return false; - - commState.idx++; - - case 11: - if (!commState.putUuid(originatingNodeId)) - return false; - - commState.idx++; - - case 12: - if (!commState.putLong(originatingThreadId)) - return false; - - commState.idx++; - - case 13: - if (!commState.putBoolean(nearOnlyCheck)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 9: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - case 10: - GridCacheVersion nearXidVer0 = commState.getCacheVersion(); - - if (nearXidVer0 == CACHE_VER_NOT_READ) - return false; - - nearXidVer = nearXidVer0; - - commState.idx++; - - case 11: - UUID originatingNodeId0 = commState.getUuid(); - - if (originatingNodeId0 == UUID_NOT_READ) - return false; - - originatingNodeId = originatingNodeId0; - - commState.idx++; - - case 12: - if (buf.remaining() < 8) - return false; - - originatingThreadId = commState.getLong(); - - commState.idx++; - - case 13: - if (buf.remaining() < 1) - return false; - - nearOnlyCheck = commState.getBoolean(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 20; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxRequest.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java deleted file mode 100644 index ef8b45e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * Check prepared transactions response. - */ -public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private IgniteUuid futId; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** Committed transaction info. */ - @GridDirectTransient - private GridCacheCommittedTxInfo<K, V> committedTxInfo; - - /** Serialized transaction info. */ - private byte[] committedTxInfoBytes; - - /** - * Empty constructor required by {@link Externalizable} - */ - public GridCachePessimisticCheckCommittedTxResponse() { - // No-op. - } - - /** - * @param txId Transaction ID. - * @param futId Future ID. - * @param miniId Mini future ID. - * @param committedTxInfo Committed transaction info. - */ - public GridCachePessimisticCheckCommittedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId, - @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo) { - super(txId, 0); - - this.futId = futId; - this.miniId = miniId; - this.committedTxInfo = committedTxInfo; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return {@code True} if all remote transactions were prepared. - */ - public GridCacheCommittedTxInfo<K, V> committedTxInfo() { - return committedTxInfo; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (committedTxInfo != null) { - marshalTx(committedTxInfo.recoveryWrites(), ctx); - - committedTxInfoBytes = ctx.marshaller().marshal(committedTxInfo); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (committedTxInfoBytes != null) { - committedTxInfo = ctx.marshaller().unmarshal(committedTxInfoBytes, ldr); - - unmarshalTx(committedTxInfo.recoveryWrites(), false, ctx, ldr); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridCachePessimisticCheckCommittedTxResponse _clone = new GridCachePessimisticCheckCommittedTxResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridCachePessimisticCheckCommittedTxResponse _clone = (GridCachePessimisticCheckCommittedTxResponse)_msg; - - _clone.futId = futId; - _clone.miniId = miniId; - _clone.committedTxInfo = committedTxInfo; - _clone.committedTxInfoBytes = committedTxInfoBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putByteArray(committedTxInfoBytes)) - return false; - - commState.idx++; - - case 9: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 10: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - byte[] committedTxInfoBytes0 = commState.getByteArray(); - - if (committedTxInfoBytes0 == BYTE_ARR_NOT_READ) - return false; - - committedTxInfoBytes = committedTxInfoBytes0; - - commState.idx++; - - case 9: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 10: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 21; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCachePessimisticCheckCommittedTxResponse.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java deleted file mode 100644 index 4c60c61..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.nio.*; -import java.util.*; - -/** - * - */ -public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { - /** Entries keys. */ - @GridToStringInclude - @GridDirectTransient - private List<K> keys; - - /** Keys bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> keysBytes; - - /** Entries versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List<GridCacheVersion> vers; - - /** Near entries keys. */ - @GridToStringInclude - @GridDirectTransient - private List<K> nearKeys; - - /** Near entries bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> nearKeysBytes; - - /** Near entries versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List<GridCacheVersion> nearVers; - - /** New TTL. */ - private long ttl; - - /** Topology version. */ - private long topVer; - - /** - * Required empty constructor. - */ - public GridCacheTtlUpdateRequest() { - // No-op. - } - - /** - * @param topVer Topology version. - * @param ttl TTL. - */ - public GridCacheTtlUpdateRequest(long topVer, long ttl) { - assert ttl >= 0 : ttl; - - this.topVer = topVer; - this.ttl = ttl; - } - - /** - * @return Topology version. - */ - public long topologyVersion() { - return topVer; - } - - /** - * @return TTL. - */ - public long ttl() { - return ttl; - } - - /** - * @param keyBytes Key bytes. - * @param ver Version. - */ - public void addEntry(byte[] keyBytes, GridCacheVersion ver) { - if (keysBytes == null) { - keysBytes = new ArrayList<>(); - - vers = new ArrayList<>(); - } - - keysBytes.add(keyBytes); - - vers.add(ver); - } - - /** - * @param keyBytes Key bytes. - * @param ver Version. - */ - public void addNearEntry(byte[] keyBytes, GridCacheVersion ver) { - if (nearKeysBytes == null) { - nearKeysBytes = new ArrayList<>(); - - nearVers = new ArrayList<>(); - } - - nearKeysBytes.add(keyBytes); - - nearVers.add(ver); - } - - /** - * @return Keys. - */ - public List<K> keys() { - return keys; - } - - /** - * @return Versions. - */ - public List<GridCacheVersion > versions() { - return vers; - } - - /** - * @param idx Entry index. - * @return Version. - */ - public GridCacheVersion version(int idx) { - assert idx >= 0 && idx < vers.size() : idx; - - return vers.get(idx); - } - - /** - * @return Keys for near cache. - */ - public List<K> nearKeys() { - return nearKeys; - } - - /** - * @return Versions for near cache entries. - */ - public List<GridCacheVersion > nearVersions() { - return nearVers; - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) - throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (keys == null && keysBytes != null) - keys = unmarshalCollection(keysBytes, ctx, ldr); - - if (nearKeys == null && nearKeysBytes != null) - nearKeys = unmarshalCollection(nearKeysBytes, ctx, ldr); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 82; - } - - /** {@inheritDoc} */ - @SuppressWarnings("CloneDoesntCallSuperClone") - @Override public GridTcpCommunicationMessageAdapter clone() { - GridCacheTtlUpdateRequest _clone = new GridCacheTtlUpdateRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 3: - if (keysBytes != null) { - if (commState.it == null) { - if (!commState.putInt(keysBytes.size())) - return false; - - commState.it = keysBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 4: - if (nearKeysBytes != null) { - if (commState.it == null) { - if (!commState.putInt(nearKeysBytes.size())) - return false; - - commState.it = nearKeysBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 5: - if (nearVers != null) { - if (commState.it == null) { - if (!commState.putInt(nearVers.size())) - return false; - - commState.it = nearVers.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 6: - if (!commState.putLong(topVer)) - return false; - - commState.idx++; - - case 7: - if (!commState.putLong(ttl)) - return false; - - commState.idx++; - - case 8: - if (vers != null) { - if (commState.it == null) { - if (!commState.putInt(vers.size())) - return false; - - commState.it = vers.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 3: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (keysBytes == null) - keysBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - keysBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 4: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearKeysBytes == null) - nearKeysBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - nearKeysBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 5: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearVers == null) - nearVers = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - nearVers.add((GridCacheVersion)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 6: - if (buf.remaining() < 8) - return false; - - topVer = commState.getLong(); - - commState.idx++; - - case 7: - if (buf.remaining() < 8) - return false; - - ttl = commState.getLong(); - - commState.idx++; - - case 8: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (vers == null) - vers = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - vers.add((GridCacheVersion)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg; - - _clone.keys = keys; - _clone.keysBytes = keysBytes; - _clone.vers = vers; - _clone.nearKeys = nearKeys; - _clone.nearKeysBytes = nearKeysBytes; - _clone.nearVers = nearVers; - _clone.ttl = ttl; - _clone.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheTtlUpdateRequest.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxCommitBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxCommitBuffer.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxCommitBuffer.java deleted file mode 100644 index a3f7676..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxCommitBuffer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Buffer that stores transaction commit values in order to restore them in case of originating node crash. - */ -public interface GridCacheTxCommitBuffer<K, V> { - /** - * Adds committed transaction to commit buffer. - * - * @param tx Committed transaction. - */ - public void addCommittedTx(IgniteTxEx<K, V> tx); - - /** - * Gets transaction from commit buffer. - * - * @param originatingTxVer Originating tx version. - * @param nodeId Originating node ID. - * @param threadId Originating thread ID. - * @return Committed info, if any. - */ - @Nullable public GridCacheCommittedTxInfo<K, V> committedTx(GridCacheVersion originatingTxVer, UUID nodeId, - long threadId); - - /** - * Callback called when lode left grid. Used to eventually cleanup the queue from committed tx info from - * left node. - * - * @param nodeId Left node ID. - */ - public void onNodeLeft(UUID nodeId); - - /** - * @return Buffer size. - */ - public int size(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java deleted file mode 100644 index 32cb8c0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxFinishSync.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.future.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Synchronization structure for asynchronous waiting for near tx finish responses based on per-node per-thread - * basis. - */ -public class GridCacheTxFinishSync<K, V> { - /** Cache context. */ - private GridCacheSharedContext<K, V> cctx; - - /** Logger. */ - private IgniteLogger log; - - /** Nodes map. */ - private ConcurrentMap<Long, ThreadFinishSync> threadMap = new ConcurrentHashMap8<>(); - - /** - * @param cctx Cache context. - */ - public GridCacheTxFinishSync(GridCacheSharedContext<K, V> cctx) { - this.cctx = cctx; - - log = cctx.logger(GridCacheTxFinishSync.class); - } - - /** - * Callback invoked before finish request is sent to remote node. - * - * @param nodeId Node ID request being sent to. - * @param threadId Thread ID started transaction. - */ - public void onFinishSend(UUID nodeId, long threadId) { - ThreadFinishSync threadSync = threadMap.get(threadId); - - if (threadSync == null) - threadSync = F.addIfAbsent(threadMap, threadId, new ThreadFinishSync(threadId)); - - threadSync.onSend(nodeId); - } - - /** - * @param nodeId Node ID to wait ack from. - * @param threadId Thread ID to wait ack. - * @return {@code null} if ack was received or future that will be completed when ack is received. - */ - public IgniteFuture<?> awaitAckAsync(UUID nodeId, long threadId) { - ThreadFinishSync threadSync = threadMap.get(threadId); - - if (threadSync == null) - return null; - - return threadSync.awaitAckAsync(nodeId); - } - - /** - * Callback invoked when finish response is received from remote node. - * - * @param nodeId Node ID response was received from. - * @param threadId Thread ID started transaction. - */ - public void onAckReceived(UUID nodeId, long threadId) { - ThreadFinishSync threadSync = threadMap.get(threadId); - - if (threadSync != null) - threadSync.onReceive(nodeId); - } - - /** - * Callback invoked when node leaves grid. - * - * @param nodeId Left node ID. - */ - public void onNodeLeft(UUID nodeId) { - for (ThreadFinishSync threadSync : threadMap.values()) - threadSync.onNodeLeft(nodeId); - } - - /** - * Per-node sync. - */ - private class ThreadFinishSync { - /** Thread ID. */ - private long threadId; - - /** Thread map. */ - private final Map<UUID, TxFinishSync> nodeMap = new ConcurrentHashMap8<>(); - - /** - * @param threadId Thread ID. - */ - private ThreadFinishSync(long threadId) { - this.threadId = threadId; - } - - /** - * @param nodeId Node ID request being sent to. - */ - public void onSend(UUID nodeId) { - TxFinishSync sync = nodeMap.get(nodeId); - - if (sync == null) { - sync = new TxFinishSync(nodeId, threadId); - - TxFinishSync old = nodeMap.put(nodeId, sync); - - assert old == null : "Only user thread can add sync objects to the map."; - - // Recheck discovery only if added new sync. - if (cctx.discovery().node(nodeId) == null) { - sync.onNodeLeft(); - - nodeMap.remove(nodeId); - } - } - - sync.onSend(); - } - - /** - * Asynchronously awaits ack from node with given node ID. - * - * @param nodeId Node ID to wait ack from. - * @return {@code null} if ack has been received or future that will be completed when ack is received. - */ - public IgniteFuture<?> awaitAckAsync(UUID nodeId) { - TxFinishSync sync = nodeMap.get(nodeId); - - if (sync == null) - return null; - - return sync.awaitAckAsync(); - } - - /** - * @param nodeId Node ID response received from. - */ - public void onReceive(UUID nodeId) { - TxFinishSync sync = nodeMap.get(nodeId); - - if (sync != null) - sync.onReceive(); - } - - /** - * @param nodeId Left node ID. - */ - public void onNodeLeft(UUID nodeId) { - TxFinishSync sync = nodeMap.remove(nodeId); - - if (sync != null) - sync.onNodeLeft(); - } - } - - /** - * Tx sync. Allocated per-node per-thread. - */ - private class TxFinishSync { - /** Node ID. */ - private final UUID nodeId; - - /** Thread ID. */ - private final long threadId; - - /** Number of awaiting messages. */ - private int cnt; - - /** Node left flag. */ - private boolean nodeLeft; - - /** Pending await future. */ - private GridFutureAdapter<?> pendingFut; - - /** - * @param nodeId Sync node ID. Used to construct correct error message. - * @param threadId Thread ID. - */ - private TxFinishSync(UUID nodeId, long threadId) { - this.nodeId = nodeId; - this.threadId = threadId; - } - - /** - * Callback invoked before sending finish request to remote nodes. - * Will synchronously wait for previous finish response. - */ - public void onSend() { - synchronized (this) { - if (log.isTraceEnabled()) - log.trace("Moved transaction synchronizer to waiting state [nodeId=" + nodeId + - ", threadId=" + threadId + ']'); - - assert cnt == 0 || nodeLeft; - - if (nodeLeft) - return; - - // Do not create future for every send operation. - cnt = 1; - } - } - - /** - * Asynchronously waits for ack to be received from node. - * - * @return {@code null} if ack has been received, or future that will be completed when ack is received. - */ - @Nullable public IgniteFuture<?> awaitAckAsync() { - synchronized (this) { - if (cnt == 0) - return null; - - if (nodeLeft) - return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to wait for finish synchronizer " + - "state (node left grid): " + nodeId)); - - if (pendingFut == null) { - if (log.isTraceEnabled()) - log.trace("Creating transaction synchronizer future [nodeId=" + nodeId + - ", threadId=" + threadId + ']'); - - pendingFut = new GridFutureAdapter<>(); - } - - return pendingFut; - } - } - - /** - * Callback for received response. - */ - public void onReceive() { - synchronized (this) { - if (log.isTraceEnabled()) - log.trace("Moving transaction synchronizer to completed state [nodeId=" + nodeId + - ", threadId=" + threadId + ']'); - - cnt = 0; - - if (pendingFut != null) { - pendingFut.onDone(); - - pendingFut = null; - } - } - } - - /** - * Callback for node leave event. - */ - public void onNodeLeft() { - synchronized (this) { - nodeLeft = true; - - if (pendingFut != null) { - pendingFut.onDone(new IgniteCheckedException("Failed to wait for transaction synchronizer " + - "completed state (node left grid): " + nodeId)); - - pendingFut = null; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedBaseMessage.java deleted file mode 100644 index 55ee372..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedBaseMessage.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Base for all messages in replicated cache. - */ -public abstract class GridDistributedBaseMessage<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable, - GridCacheVersionable { - /** */ - private static final long serialVersionUID = 0L; - - /** Lock or transaction version. */ - @GridToStringInclude - protected GridCacheVersion ver; - - /** - * Candidates for every key ordered in the order of keys. These - * can be either local-only candidates in case of lock acquisition, - * or pending candidates in case of transaction commit. - */ - @GridToStringInclude - @GridDirectTransient - private Collection<GridCacheMvccCandidate<K>>[] candsByIdx; - - /** */ - @GridToStringExclude - private byte[] candsByIdxBytes; - - /** Collections of local lock candidates. */ - @GridToStringInclude - @GridDirectTransient - private Map<K, Collection<GridCacheMvccCandidate<K>>> candsByKey; - - /** Collections of local lock candidates in serialized form. */ - @GridToStringExclude - private byte[] candsByKeyBytes; - - /** Committed versions with order higher than one for this message (needed for commit ordering). */ - @GridToStringInclude - @GridDirectCollection(GridCacheVersion.class) - private Collection<GridCacheVersion> committedVers; - - /** Rolled back versions with order higher than one for this message (needed for commit ordering). */ - @GridToStringInclude - @GridDirectCollection(GridCacheVersion.class) - private Collection<GridCacheVersion> rolledbackVers; - - /** Count of keys referenced in candidates array (needed only locally for optimization). */ - @GridToStringInclude - @GridDirectTransient - private int cnt; - - /** - * Empty constructor required by {@link Externalizable} - */ - protected GridDistributedBaseMessage() { - /* No-op. */ - } - - /** - * @param cnt Count of keys references in list of candidates. - */ - protected GridDistributedBaseMessage(int cnt) { - assert cnt >= 0; - - this.cnt = cnt; - } - - /** - * @param ver Either lock or transaction version. - * @param cnt Key count. - */ - protected GridDistributedBaseMessage(GridCacheVersion ver, int cnt) { - this(cnt); - - assert ver != null; - - this.ver = ver; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (candsByIdx != null) - candsByIdxBytes = ctx.marshaller().marshal(candsByIdx); - - if (candsByKey != null) { - if (ctx.deploymentEnabled()) { - for (K key : candsByKey.keySet()) - prepareObject(key, ctx); - } - - candsByKeyBytes = CU.marshal(ctx, candsByKey); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (candsByIdxBytes != null) - candsByIdx = ctx.marshaller().unmarshal(candsByIdxBytes, ldr); - - if (candsByKeyBytes != null) - candsByKey = ctx.marshaller().unmarshal(candsByKeyBytes, ldr); - } - - /** - * @return Version. - */ - @Override public GridCacheVersion version() { - return ver; - } - - /** - * @param ver Version. - */ - public void version(GridCacheVersion ver) { - this.ver = ver; - } - - /** - * @param committedVers Committed versions. - * @param rolledbackVers Rolled back versions. - */ - public void completedVersions(Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers) { - this.committedVers = committedVers; - this.rolledbackVers = rolledbackVers; - } - - /** - * @return Committed versions. - */ - public Collection<GridCacheVersion> committedVersions() { - return committedVers == null ? Collections.<GridCacheVersion>emptyList() : committedVers; - } - - /** - * @return Rolled back versions. - */ - public Collection<GridCacheVersion> rolledbackVersions() { - return rolledbackVers == null ? Collections.<GridCacheVersion>emptyList() : rolledbackVers; - } - - /** - * @param idx Key index. - * @param candsByIdx List of candidates for that key. - */ - @SuppressWarnings({"unchecked"}) - public void candidatesByIndex(int idx, Collection<GridCacheMvccCandidate<K>> candsByIdx) { - assert idx < cnt; - - // If nothing to add. - if (candsByIdx == null || candsByIdx.isEmpty()) - return; - - if (this.candsByIdx == null) - this.candsByIdx = new Collection[cnt]; - - this.candsByIdx[idx] = candsByIdx; - } - - /** - * @param idx Key index. - * @return Candidates for given key. - */ - public Collection<GridCacheMvccCandidate<K>> candidatesByIndex(int idx) { - return candsByIdx == null || candsByIdx[idx] == null ? Collections.<GridCacheMvccCandidate<K>>emptyList() : candsByIdx[idx]; - } - - /** - * @param key Candidates key. - * @param candsByKey Collection of local candidates. - */ - public void candidatesByKey(K key, Collection<GridCacheMvccCandidate<K>> candsByKey) { - if (this.candsByKey == null) - this.candsByKey = new HashMap<>(1, 1.0f); - - this.candsByKey.put(key, candsByKey); - } - - /** - * - * @param key Candidates key. - * @return Collection of lock candidates at given index. - */ - @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesByKey(K key) { - assert key != null; - - if (candsByKey == null) - return null; - - return candsByKey.get(key); - } - - /** - * @return Map of candidates. - */ - public Map<K, Collection<GridCacheMvccCandidate<K>>> candidatesByKey() { - return candsByKey == null ? Collections.<K, Collection<GridCacheMvccCandidate<K>>>emptyMap() : candsByKey; - } - - /** - * @return Count of keys referenced in candidates array (needed only locally for optimization). - */ - public int keysCount() { - return cnt; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDistributedBaseMessage _clone = (GridDistributedBaseMessage)_msg; - - _clone.ver = ver; - _clone.candsByIdx = candsByIdx; - _clone.candsByIdxBytes = candsByIdxBytes; - _clone.candsByKey = candsByKey; - _clone.candsByKeyBytes = candsByKeyBytes; - _clone.committedVers = committedVers; - _clone.rolledbackVers = rolledbackVers; - _clone.cnt = cnt; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 3: - if (!commState.putByteArray(candsByIdxBytes)) - return false; - - commState.idx++; - - case 4: - if (!commState.putByteArray(candsByKeyBytes)) - return false; - - commState.idx++; - - case 5: - if (committedVers != null) { - if (commState.it == null) { - if (!commState.putInt(committedVers.size())) - return false; - - commState.it = committedVers.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 6: - if (rolledbackVers != null) { - if (commState.it == null) { - if (!commState.putInt(rolledbackVers.size())) - return false; - - commState.it = rolledbackVers.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 7: - if (!commState.putCacheVersion(ver)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 3: - byte[] candsByIdxBytes0 = commState.getByteArray(); - - if (candsByIdxBytes0 == BYTE_ARR_NOT_READ) - return false; - - candsByIdxBytes = candsByIdxBytes0; - - commState.idx++; - - case 4: - byte[] candsByKeyBytes0 = commState.getByteArray(); - - if (candsByKeyBytes0 == BYTE_ARR_NOT_READ) - return false; - - candsByKeyBytes = candsByKeyBytes0; - - commState.idx++; - - case 5: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (committedVers == null) - committedVers = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - committedVers.add((GridCacheVersion)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 6: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (rolledbackVers == null) - rolledbackVers = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - rolledbackVers.add((GridCacheVersion)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 7: - GridCacheVersion ver0 = commState.getCacheVersion(); - - if (ver0 == CACHE_VER_NOT_READ) - return false; - - ver = ver0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDistributedBaseMessage.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java deleted file mode 100644 index ace4dd9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Distributed cache implementation. - */ -public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - protected GridDistributedCacheAdapter() { - // No-op. - } - - /** - * @param ctx Cache registry. - * @param startSize Start size. - */ - protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, int startSize) { - super(ctx, startSize); - } - - /** - * @param ctx Cache context. - * @param map Cache map. - */ - protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { - super(ctx, map); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> txLockAsync( - Collection<? extends K> keys, - long timeout, - IgniteTxLocalEx<K, V> tx, - boolean isRead, - boolean retval, - IgniteTxIsolation isolation, - boolean isInvalidate, - long accessTtl, - IgnitePredicate<GridCacheEntry<K, V>>[] filter - ) { - assert tx != null; - - return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, accessTtl, filter); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - IgnitePredicate<GridCacheEntry<K, V>>... filter) { - IgniteTxLocalEx<K, V> tx = ctx.tm().userTxx(); - - // Return value flag is true because we choose to bring values for explicit locks. - return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter); - } - - /** - * @param keys Keys to lock. - * @param timeout Timeout. - * @param tx Transaction - * @param isInvalidate Invalidation flag. - * @param isRead Indicates whether value is read or written. - * @param retval Flag to return value. - * @param isolation Transaction isolation. - * @param accessTtl TTL for read operation. - * @param filter Optional filter. - * @return Future for locks. - */ - protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, - long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, - boolean isInvalidate, - boolean isRead, - boolean retval, - @Nullable IgniteTxIsolation isolation, - long accessTtl, - IgnitePredicate<GridCacheEntry<K, V>>[] filter); - - /** - * @param key Key to remove. - * @param ver Version to remove. - */ - public void removeVersionedEntry(K key, GridCacheVersion ver) { - GridCacheEntryEx<K, V> entry = peekEx(key); - - if (entry == null) - return; - - if (entry.markObsoleteVersion(ver)) - removeEntry(entry); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDistributedCacheAdapter.class, this, "super", super.toString()); - } -}