http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java deleted file mode 100644 index e704006..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ /dev/null @@ -1,702 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Near transaction finish request. - */ -public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Near node ID. */ - private UUID nearNodeId; - - /** Transaction isolation. */ - private IgniteTxIsolation isolation; - - /** Near writes. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> nearWrites; - - /** Serialized near writes. */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> nearWritesBytes; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** System invalidation flag. */ - private boolean sysInvalidate; - - /** Topology version. */ - private long topVer; - - /** Pending versions with order less than one for this message (needed for commit ordering). */ - @GridToStringInclude - @GridDirectCollection(GridCacheVersion.class) - private Collection<GridCacheVersion> pendingVers; - - /** One phase commit flag for fast-commit path. */ - private boolean onePhaseCommit; - - /** One phase commit write version. */ - private GridCacheVersion writeVer; - - /** Subject ID. */ - @GridDirectVersion(1) - private UUID subjId; - - /** Task name hash. */ - @GridDirectVersion(2) - private int taskNameHash; - - /** TTLs for optimistic transaction. */ - private GridLongList ttls; - - /** Near cache TTLs for optimistic transaction. */ - private GridLongList nearTtls; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtTxFinishRequest() { - // No-op. - } - - /** - * @param nearNodeId Near node ID. - * @param futId Future ID. - * @param miniId Mini future ID. - * @param topVer Topology version. - * @param xidVer Transaction ID. - * @param threadId Thread ID. - * @param commitVer Commit version. - * @param isolation Transaction isolation. - * @param commit Commit flag. - * @param invalidate Invalidate flag. - * @param sys System flag. - * @param sysInvalidate System invalidation flag. - * @param syncCommit Synchronous commit flag. - * @param syncRollback Synchronous rollback flag. - * @param baseVer Base version. - * @param committedVers Committed versions. - * @param rolledbackVers Rolled back versions. - * @param pendingVers Pending versions. - * @param txSize Expected transaction size. - * @param writes Write entries. - * @param nearWrites Near cache writes. - * @param recoverWrites Recovery write entries. - * @param onePhaseCommit One phase commit flag. - * @param grpLockKey Group lock key. - * @param subjId Subject ID. - * @param taskNameHash Task name hash. - */ - public GridDhtTxFinishRequest( - UUID nearNodeId, - IgniteUuid futId, - IgniteUuid miniId, - long topVer, - GridCacheVersion xidVer, - GridCacheVersion commitVer, - long threadId, - IgniteTxIsolation isolation, - boolean commit, - boolean invalidate, - boolean sys, - boolean sysInvalidate, - boolean syncCommit, - boolean syncRollback, - GridCacheVersion baseVer, - Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, - Collection<GridCacheVersion> pendingVers, - int txSize, - Collection<IgniteTxEntry<K, V>> writes, - Collection<IgniteTxEntry<K, V>> nearWrites, - Collection<IgniteTxEntry<K, V>> recoverWrites, - boolean onePhaseCommit, - @Nullable IgniteTxKey grpLockKey, - @Nullable UUID subjId, - int taskNameHash - ) { - super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, - committedVers, rolledbackVers, txSize, writes, recoverWrites, grpLockKey); - - assert miniId != null; - assert nearNodeId != null; - assert isolation != null; - - this.pendingVers = pendingVers; - this.topVer = topVer; - this.nearNodeId = nearNodeId; - this.isolation = isolation; - this.nearWrites = nearWrites; - this.miniId = miniId; - this.sysInvalidate = sysInvalidate; - this.onePhaseCommit = onePhaseCommit; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - } - - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - - /** - * @return Near writes. - */ - public Collection<IgniteTxEntry<K, V>> nearWrites() { - return nearWrites == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : nearWrites; - } - - /** - * @return Mini ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** - * @return Subject ID. - */ - @Nullable public UUID subjectId() { - return subjId; - } - - /** - * @return Task name hash. - */ - public int taskNameHash() { - return taskNameHash; - } - - /** - * @return Transaction isolation. - */ - public IgniteTxIsolation isolation() { - return isolation; - } - - /** - * @return Near node ID. - */ - public UUID nearNodeId() { - return nearNodeId; - } - - /** - * @return System invalidate flag. - */ - public boolean isSystemInvalidate() { - return sysInvalidate; - } - - /** - * @return One phase commit flag. - */ - public boolean onePhaseCommit() { - return onePhaseCommit; - } - - /** - * @return Write version for one-phase commit transactions. - */ - public GridCacheVersion writeVersion() { - return writeVer; - } - - /** - * @param writeVer Write version for one-phase commit transactions. - */ - public void writeVersion(GridCacheVersion writeVer) { - this.writeVer = writeVer; - } - - /** - * @return Topology version. - */ - @Override public long topologyVersion() { - return topVer; - } - - /** - * Gets versions of not acquired locks with version less then one of transaction being committed. - * - * @return Versions of locks for entries participating in transaction that have not been acquired yet - * have version less then one of transaction being committed. - */ - public Collection<GridCacheVersion> pendingVersions() { - return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; - } - - /** - * @param idx Entry index. - * @param ttl TTL. - */ - public void ttl(int idx, long ttl) { - if (ttl != -1L) { - if (ttls == null) { - ttls = new GridLongList(); - - for (int i = 0; i < idx - 1; i++) - ttls.add(-1L); - } - } - - if (ttls != null) - ttls.add(ttl); - } - - /** - * @return TTLs for optimistic transaction. - */ - public GridLongList ttls() { - return ttls; - } - - /** - * @param idx Entry index. - * @param ttl TTL. - */ - public void nearTtl(int idx, long ttl) { - if (ttl != -1L) { - if (nearTtls == null) { - nearTtls = new GridLongList(); - - for (int i = 0; i < idx - 1; i++) - nearTtls.add(-1L); - } - } - - if (nearTtls != null) - nearTtls.add(ttl); - } - - /** - * @return TTLs for optimistic transaction. - */ - public GridLongList nearTtls() { - return nearTtls; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (nearWrites != null) { - marshalTx(nearWrites, ctx); - - nearWritesBytes = new ArrayList<>(nearWrites.size()); - - for (IgniteTxEntry<K, V> e : nearWrites) - nearWritesBytes.add(ctx.marshaller().marshal(e)); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (nearWritesBytes != null) { - nearWrites = new ArrayList<>(nearWritesBytes.size()); - - for (byte[] arr : nearWritesBytes) - nearWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(nearWrites, true, ctx, ldr); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtTxFinishRequest _clone = new GridDhtTxFinishRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtTxFinishRequest _clone = (GridDhtTxFinishRequest)_msg; - - _clone.nearNodeId = nearNodeId; - _clone.isolation = isolation; - _clone.nearWrites = nearWrites; - _clone.nearWritesBytes = nearWritesBytes; - _clone.miniId = miniId; - _clone.sysInvalidate = sysInvalidate; - _clone.topVer = topVer; - _clone.pendingVers = pendingVers; - _clone.onePhaseCommit = onePhaseCommit; - _clone.writeVer = writeVer; - _clone.subjId = subjId; - _clone.taskNameHash = taskNameHash; - _clone.ttls = ttls; - _clone.nearTtls = nearTtls; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 21: - if (!commState.putEnum(isolation)) - return false; - - commState.idx++; - - case 22: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - case 23: - if (!commState.putUuid(nearNodeId)) - return false; - - commState.idx++; - - case 24: - if (nearWritesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(nearWritesBytes.size())) - return false; - - commState.it = nearWritesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 25: - if (!commState.putBoolean(onePhaseCommit)) - return false; - - commState.idx++; - - case 26: - if (pendingVers != null) { - if (commState.it == null) { - if (!commState.putInt(pendingVers.size())) - return false; - - commState.it = pendingVers.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 27: - if (!commState.putBoolean(sysInvalidate)) - return false; - - commState.idx++; - - case 28: - if (!commState.putLong(topVer)) - return false; - - commState.idx++; - - case 29: - if (!commState.putCacheVersion(writeVer)) - return false; - - commState.idx++; - - case 30: - if (!commState.putUuid(subjId)) - return false; - - commState.idx++; - - case 31: - if (!commState.putInt(taskNameHash)) - return false; - - commState.idx++; - - case 32: - if (!commState.putLongList(ttls)) - return false; - - commState.idx++; - - case 33: - if (!commState.putLongList(nearTtls)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 21: - if (buf.remaining() < 1) - return false; - - byte isolation0 = commState.getByte(); - - isolation = IgniteTxIsolation.fromOrdinal(isolation0); - - commState.idx++; - - case 22: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - case 23: - UUID nearNodeId0 = commState.getUuid(); - - if (nearNodeId0 == UUID_NOT_READ) - return false; - - nearNodeId = nearNodeId0; - - commState.idx++; - - case 24: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearWritesBytes == null) - nearWritesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - nearWritesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 25: - if (buf.remaining() < 1) - return false; - - onePhaseCommit = commState.getBoolean(); - - commState.idx++; - - case 26: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (pendingVers == null) - pendingVers = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - pendingVers.add((GridCacheVersion)_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 27: - if (buf.remaining() < 1) - return false; - - sysInvalidate = commState.getBoolean(); - - commState.idx++; - - case 28: - if (buf.remaining() < 8) - return false; - - topVer = commState.getLong(); - - commState.idx++; - - case 29: - GridCacheVersion writeVer0 = commState.getCacheVersion(); - - if (writeVer0 == CACHE_VER_NOT_READ) - return false; - - writeVer = writeVer0; - - commState.idx++; - - case 30: - UUID subjId0 = commState.getUuid(); - - if (subjId0 == UUID_NOT_READ) - return false; - - subjId = subjId0; - - commState.idx++; - - case 31: - if (buf.remaining() < 4) - return false; - - taskNameHash = commState.getInt(); - - commState.idx++; - - case 32: - GridLongList ttls0 = commState.getLongList(); - - if (ttls0 == LONG_LIST_NOT_READ) - return false; - - ttls = ttls0; - - commState.idx++; - - case 33: - GridLongList nearTtls0 = commState.getLongList(); - - if (nearTtls0 == LONG_LIST_NOT_READ) - return false; - - nearTtls = nearTtls0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 31; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java deleted file mode 100644 index a086789..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; - -/** - * DHT transaction finish response. - */ -public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishResponse<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Mini future ID. */ - private IgniteUuid miniId; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDhtTxFinishResponse() { - // No-op. - } - - /** - * @param xid Xid version. - * @param futId Future ID. - * @param miniId Mini future ID. - */ - public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) { - super(xid, futId); - - assert miniId != null; - - this.miniId = miniId; - } - - /** - * @return Mini future ID. - */ - public IgniteUuid miniId() { - return miniId; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxFinishResponse.class, this, super.toString()); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDhtTxFinishResponse _clone = new GridDhtTxFinishResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDhtTxFinishResponse _clone = (GridDhtTxFinishResponse)_msg; - - _clone.miniId = miniId; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 5: - if (!commState.putGridUuid(miniId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 5: - IgniteUuid miniId0 = commState.getGridUuid(); - - if (miniId0 == GRID_UUID_NOT_READ) - return false; - - miniId = miniId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 32; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java deleted file mode 100644 index 5036071..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ /dev/null @@ -1,656 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; - -/** - * Replicated user transaction. - */ -public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements GridCacheMappedVersion { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private UUID nearNodeId; - - /** Near future ID. */ - private IgniteUuid nearFutId; - - /** Near future ID. */ - private IgniteUuid nearMiniId; - - /** Near future ID. */ - private IgniteUuid nearFinFutId; - - /** Near future ID. */ - private IgniteUuid nearFinMiniId; - - /** Near XID. */ - private GridCacheVersion nearXidVer; - - /** Transaction nodes mapping (primary node -> related backup nodes). */ - private Map<UUID, Collection<UUID>> txNodes; - - /** Future. */ - @GridToStringExclude - private final AtomicReference<GridDhtTxPrepareFuture<K, V>> prepFut = - new AtomicReference<>(); - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtTxLocal() { - // No-op. - } - - /** - * @param nearNodeId Near node ID that initiated transaction. - * @param nearXidVer Near transaction ID. - * @param nearFutId Near future ID. - * @param nearMiniId Near mini future ID. - * @param nearThreadId Near thread ID. - * @param implicit Implicit flag. - * @param implicitSingle Implicit-with-single-key flag. - * @param cctx Cache context. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param storeEnabled Store enabled flag. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock {@code True} if this is a group-lock transaction and whole partition should be locked. - * @param txNodes Transaction nodes mapping. - */ - public GridDhtTxLocal( - GridCacheSharedContext<K, V> cctx, - UUID nearNodeId, - GridCacheVersion nearXidVer, - IgniteUuid nearFutId, - IgniteUuid nearMiniId, - long nearThreadId, - boolean implicit, - boolean implicitSingle, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - boolean invalidate, - boolean storeEnabled, - int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, - Map<UUID, Collection<UUID>> txNodes, - UUID subjId, - int taskNameHash - ) { - super( - cctx, - cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer), - implicit, - implicitSingle, - sys, - concurrency, - isolation, - timeout, - invalidate, - storeEnabled, - txSize, - grpLockKey, - partLock, - subjId, - taskNameHash); - - assert cctx != null; - assert nearNodeId != null; - assert nearFutId != null; - assert nearMiniId != null; - assert nearXidVer != null; - - this.nearNodeId = nearNodeId; - this.nearXidVer = nearXidVer; - this.nearFutId = nearFutId; - this.nearMiniId = nearMiniId; - this.txNodes = txNodes; - - threadId = nearThreadId; - - assert !F.eq(xidVer, nearXidVer); - } - - /** {@inheritDoc} */ - @Override public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - - /** {@inheritDoc} */ - @Override public UUID eventNodeId() { - return nearNodeId; - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> masterNodeIds() { - assert nearNodeId != null; - - return Collections.singleton(nearNodeId); - } - - /** {@inheritDoc} */ - @Override public UUID otherNodeId() { - assert nearNodeId != null; - - return nearNodeId; - } - - /** {@inheritDoc} */ - @Override public UUID originatingNodeId() { - return nearNodeId; - } - - /** {@inheritDoc} */ - @Override protected UUID nearNodeId() { - return nearNodeId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion nearXidVersion() { - return nearXidVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion mappedVersion() { - return nearXidVer; - } - - /** {@inheritDoc} */ - @Override protected IgniteUuid nearFutureId() { - return nearFutId; - } - - /** {@inheritDoc} */ - @Override protected IgniteUuid nearMiniId() { - return nearMiniId; - } - - /** {@inheritDoc} */ - @Override public boolean dht() { - return true; - } - - /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { - return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); - } - - /** - * @return Near future ID. - */ - public IgniteUuid nearFinishFutureId() { - return nearFinFutId; - } - - /** - * @param nearFinFutId Near future ID. - */ - public void nearFinishFutureId(IgniteUuid nearFinFutId) { - this.nearFinFutId = nearFinFutId; - } - - /** - * @return Near future mini ID. - */ - public IgniteUuid nearFinishMiniId() { - return nearFinMiniId; - } - - /** - * @param nearFinMiniId Near future mini ID. - */ - public void nearFinishMiniId(IgniteUuid nearFinMiniId) { - this.nearFinMiniId = nearFinMiniId; - } - - /** {@inheritDoc} */ - @Override @Nullable protected IgniteFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, long topVer) { - // Don't add local node as reader. - if (!cctx.localNodeId().equals(nearNodeId)) { - GridCacheContext<K, V> cacheCtx = cached.context(); - - while (true) { - try { - return cached.addReader(nearNodeId, msgId, topVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when adding to DHT local transaction: " + cached); - - cached = cacheCtx.dht().entryExx(entry.key(), topVer); - } - } - } - - return null; - } - - /** {@inheritDoc} */ - @Override protected void updateExplicitVersion(IgniteTxEntry<K, V> txEntry, GridCacheEntryEx<K, V> entry) - throws GridCacheEntryRemovedException { - // DHT local transactions don't have explicit locks. - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { - if (optimistic()) { - assert isSystemInvalidate(); - - return prepareAsync(null, null, Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 0, nearMiniId, null, true, - null); - } - - // For pessimistic mode we don't distribute prepare request. - GridDhtTxPrepareFuture<K, V> fut = prepFut.get(); - - if (fut == null) { - // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, - Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), true, null))) - return prepFut.get(); - } - else - // Prepare was called explicitly. - return fut; - - if (!state(PREPARING)) { - if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutException("Transaction timed out and was rolled back: " + this)); - else - fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + - ", tx=" + this + ']')); - } - else - fut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare [state=" + state() - + ", tx=" + this + ']')); - - return fut; - } - - try { - userPrepare(); - - if (!state(PREPARED)) { - setRollbackOnly(); - - fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() + - ", tx=" + this + ']')); - - return fut; - } - - fut.complete(); - - return fut; - } - catch (IgniteCheckedException e) { - fut.onError(e); - - return fut; - } - } - - /** - * Prepares next batch of entries in dht transaction. - * - * @param reads Read entries. - * @param writes Write entries. - * @param verMap Version map. - * @param msgId Message ID. - * @param nearMiniId Near mini future ID. - * @param txNodes Transaction nodes mapping. - * @param last {@code True} if this is last prepare request. - * @param lastBackups IDs of backup nodes receiving last prepare request. - * @return Future that will be completed when locks are acquired. - */ - public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, - @Nullable Iterable<IgniteTxEntry<K, V>> writes, - Map<IgniteTxKey<K>, GridCacheVersion> verMap, - long msgId, - IgniteUuid nearMiniId, - Map<UUID, Collection<UUID>> txNodes, - boolean last, - Collection<UUID> lastBackups) { - assert optimistic(); - - // In optimistic mode prepare still can be called explicitly from salvageTx. - GridDhtTxPrepareFuture<K, V> fut = prepFut.get(); - - if (fut == null) { - init(); - - // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last, - lastBackups))) { - GridDhtTxPrepareFuture<K, V> f = prepFut.get(); - - assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + - "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; - - return f; - } - } - else { - assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + - "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; - - // Prepare was called explicitly. - return fut; - } - - if (state() != PREPARING) { - if (!state(PREPARING)) { - if (state() == PREPARED && isSystemInvalidate()) - fut.complete(); - if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutException("Transaction timed out and was rolled back: " + - this)); - else - fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + - ", tx=" + this + ']')); - } - else - fut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare [state=" + - state() + ", tx=" + this + ']')); - - return fut; - } - } - - try { - if (reads != null) - for (IgniteTxEntry<K, V> e : reads) - addEntry(msgId, e); - - if (writes != null) - for (IgniteTxEntry<K, V> e : writes) - addEntry(msgId, e); - - userPrepare(); - - // Make sure to add future before calling prepare on it. - cctx.mvcc().addFuture(fut); - - if (isSystemInvalidate()) - fut.complete(); - else - fut.prepare(reads, writes, txNodes); - } - catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) { - fut.onError(e); - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - fut.onError(new IgniteTxRollbackException("Failed to prepare transaction: " + this, e)); - - try { - rollback(); - } - catch (IgniteTxOptimisticException e1) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e1 + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e1) { - U.error(log, "Failed to rollback transaction: " + this, e1); - } - } - - return fut; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteFuture<IgniteTx> commitAsync() { - if (log.isDebugEnabled()) - log.debug("Committing dht local tx: " + this); - - // In optimistic mode prepare was called explicitly. - if (pessimistic()) - prepareAsync(); - - final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); - - cctx.mvcc().addFuture(fut); - - GridDhtTxPrepareFuture<K, V> prep = prepFut.get(); - - if (prep != null) { - if (prep.isDone()) { - try { - prep.get(); // Check for errors of a parent future. - - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticException e) { - if (log.isDebugEnabled()) - log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); - - fut.onError(e); - } - } - else - prep.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> f) { - try { - f.get(); // Check for errors of a parent future. - - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + - CU.txString(GridDhtTxLocal.this))); - } - catch (IgniteTxOptimisticException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); - - fut.onError(e); - } - } - }); - } - else { - assert optimistic(); - - try { - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to commit transaction: " + this, e); - - fut.onError(e); - } - } - - return fut; - } - - /** {@inheritDoc} */ - @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> fut) { - assert optimistic(); - - prepFut.compareAndSet(fut, null); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTx> rollbackAsync() { - GridDhtTxPrepareFuture<K, V> prepFut = this.prepFut.get(); - - final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); - - cctx.mvcc().addFuture(fut); - - if (prepFut == null) { - try { - if (finish(false) || state() == UNKNOWN) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction (will make the best effort to rollback remote nodes): " + - this, e); - - fut.onError(e); - } - } - else { - prepFut.complete(); - - prepFut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> f) { - try { - f.get(); // Check for errors of a parent future. - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']'); - } - - try { - if (finish(false) || state() == UNKNOWN) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + - CU.txString(GridDhtTxLocal.this))); - - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this), - e); - - fut.onError(e); - } - } - }); - } - - return fut; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean finish(boolean commit) throws IgniteCheckedException { - assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate() - || onePhaseCommit() || state() == PREPARED : - "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + - ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']'; - - assert nearMiniId != null; - - return super.finish(commit); - } - - /** {@inheritDoc} */ - @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) { - if (nearFinFutId != null) { - if (nearNodeId.equals(cctx.localNodeId())) { - if (log.isDebugEnabled()) - log.debug("Skipping response sending to local node: " + this); - - return; - } - - GridNearTxFinishResponse<K, V> res = new GridNearTxFinishResponse<>(nearXidVer, threadId, nearFinFutId, - nearFinMiniId, err); - - try { - cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left before sending finish response (transaction was committed) [node=" + - nearNodeId + ", res=" + res + ']'); - } - catch (Throwable ex) { - U.error(log, "Failed to send finish response to node (transaction was " + - (commit ? "committed" : "rolledback") + ") [node=" + nearNodeId + ", res=" + res + ']', ex); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Will not send finish reply because sender node has not sent finish request yet: " + this); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDhtTxLocal.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java deleted file mode 100644 index dd8815b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ /dev/null @@ -1,831 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; - -/** - * Replicated user transaction. - */ -public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping<K, V>> nearMap = - new ConcurrentHashMap8<>(); - - /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping<K, V>> dhtMap = - new ConcurrentHashMap8<>(); - - /** Mapped flag. */ - private AtomicBoolean mapped = new AtomicBoolean(); - - /** */ - private long dhtThreadId; - - /** */ - private boolean needsCompletedVers; - - /** Versions of pending locks for entries of this tx. */ - private Collection<GridCacheVersion> pendingVers; - - /** - * Empty constructor required for {@link Externalizable}. - */ - protected GridDhtTxLocalAdapter() { - // No-op. - } - - /** - * @param xidVer Transaction version. - * @param implicit Implicit flag. - * @param implicitSingle Implicit-with-single-key flag. - * @param cctx Cache context. - * @param sys System flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock If this is a group-lock transaction and the whole partition should be locked. - */ - protected GridDhtTxLocalAdapter( - GridCacheSharedContext<K, V> cctx, - GridCacheVersion xidVer, - boolean implicit, - boolean implicitSingle, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - boolean invalidate, - boolean storeEnabled, - int txSize, - @Nullable IgniteTxKey grpLockKey, - boolean partLock, - @Nullable UUID subjId, - int taskNameHash - ) { - super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled, - txSize, grpLockKey, partLock, subjId, taskNameHash); - - assert cctx != null; - - threadId = Thread.currentThread().getId(); - dhtThreadId = threadId; - } - - /** - * @return Near node id. - */ - protected abstract UUID nearNodeId(); - - /** - * @return Near future ID. - */ - protected abstract IgniteUuid nearFutureId(); - - /** - * @return Near future mini ID. - */ - protected abstract IgniteUuid nearMiniId(); - - /** - * Adds reader to cached entry. - * - * @param msgId Message ID. - * @param cached Cached entry. - * @param entry Transaction entry. - * @param topVer Topology version. - * @return {@code True} if reader was added as a result of this call. - */ - @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId, - GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, - long topVer); - - /** - * @param commit Commit flag. - * @param err Error, if any. - */ - protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err); - - /** - * @param needsCompletedVers {@code True} if needs completed versions. - */ - public void needsCompletedVersions(boolean needsCompletedVers) { - this.needsCompletedVers |= needsCompletedVers; - } - - /** {@inheritDoc} */ - @Override public boolean needsCompletedVersions() { - return needsCompletedVers; - } - - /** - * @return Versions for all pending locks that were in queue before tx locks were released. - */ - public Collection<GridCacheVersion> pendingVersions() { - return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; - } - - /** - * @param pendingVers Versions for all pending locks that were in queue before tx locsk were released. - */ - public void pendingVersions(Collection<GridCacheVersion> pendingVers) { - this.pendingVers = pendingVers; - } - - /** - * @return DHT thread ID. - */ - long dhtThreadId() { - return dhtThreadId; - } - - /** - * Map explicit locks. - */ - protected void mapExplicitLocks() { - if (!mapped.get()) { - // Explicit locks may participate in implicit transactions only. - if (!implicit()) { - mapped.set(true); - - return; - } - - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtEntryMap = null; - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearEntryMap = null; - - for (IgniteTxEntry<K, V> e : allEntries()) { - assert e.cached() != null; - - GridCacheContext<K, V> cacheCtx = e.cached().context(); - - if (cacheCtx.isNear()) - continue; - - if (e.cached().obsolete()) { - GridCacheEntryEx<K, V> cached = cacheCtx.cache().entryEx(e.key()); - - e.cached(cached, cached.keyBytes()); - } - - if (e.cached().detached() || e.cached().isLocal()) - continue; - - while (true) { - try { - // Map explicit locks. - if (e.explicitVersion() != null && !e.explicitVersion().equals(xidVer)) { - if (dhtEntryMap == null) - dhtEntryMap = new GridLeanMap<>(); - - if (nearEntryMap == null) - nearEntryMap = new GridLeanMap<>(); - - cacheCtx.dhtMap(nearNodeId(), topologyVersion(), - (GridDhtCacheEntry<K, V>)e.cached(), log, dhtEntryMap, nearEntryMap); - } - - break; - } - catch (GridCacheEntryRemovedException ignore) { - GridCacheEntryEx<K, V> cached = cacheCtx.cache().entryEx(e.key()); - - e.cached(cached, cached.keyBytes()); - } - } - } - - if (!F.isEmpty(dhtEntryMap)) - addDhtMapping(dhtEntryMap); - - if (!F.isEmpty(nearEntryMap)) - addNearMapping(nearEntryMap); - - mapped.set(true); - } - } - - /** - * @return DHT map. - */ - Map<UUID, GridDistributedTxMapping<K, V>> dhtMap() { - mapExplicitLocks(); - - return dhtMap; - } - - /** - * @return Near map. - */ - Map<UUID, GridDistributedTxMapping<K, V>> nearMap() { - mapExplicitLocks(); - - return nearMap; - } - - /** - * @param nodeId Node ID. - * @return Mapping. - */ - GridDistributedTxMapping<K, V> dhtMapping(UUID nodeId) { - return dhtMap.get(nodeId); - } - - /** - * @param nodeId Node ID. - * @return Mapping. - */ - GridDistributedTxMapping<K, V> nearMapping(UUID nodeId) { - return nearMap.get(nodeId); - } - - /** - * @param mappings Mappings to add. - */ - void addDhtMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { - addMapping(mappings, dhtMap); - } - - /** - * @param mappings Mappings to add. - */ - void addNearMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { - addMapping(mappings, nearMap); - } - - /** - * @param nodeId Node ID. - * @return {@code True} if mapping was removed. - */ - public boolean removeMapping(UUID nodeId) { - return removeMapping(nodeId, null, dhtMap) | removeMapping(nodeId, null, nearMap); - } - - /** - * @param nodeId Node ID. - * @param entry Entry to remove. - * @return {@code True} if was removed. - */ - boolean removeDhtMapping(UUID nodeId, GridCacheEntryEx<K, V> entry) { - return removeMapping(nodeId, entry, dhtMap); - } - - /** - * @param nodeId Node ID. - * @param entry Entry to remove. - * @return {@code True} if was removed. - */ - boolean removeNearMapping(UUID nodeId, GridCacheEntryEx<K, V> entry) { - return removeMapping(nodeId, entry, nearMap); - } - - /** - * @param nodeId Node ID. - * @param entry Entry to remove. - * @param map Map to remove from. - * @return {@code True} if was removed. - */ - private boolean removeMapping(UUID nodeId, @Nullable GridCacheEntryEx<K, V> entry, - Map<UUID, GridDistributedTxMapping<K, V>> map) { - if (entry != null) { - if (log.isDebugEnabled()) - log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']'); - - IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey()); - - if (txEntry == null) - return false; - - GridDistributedTxMapping<K, V> m = map.get(nodeId); - - boolean ret = m != null && m.removeEntry(txEntry); - - if (m != null && m.empty()) - map.remove(nodeId); - - return ret; - } - else - return map.remove(nodeId) != null; - } - - /** - * @param mappings Entry mappings. - * @param map Transaction mappings. - */ - private void addMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings, - Map<UUID, GridDistributedTxMapping<K, V>> map) { - for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : mappings.entrySet()) { - ClusterNode n = mapping.getKey(); - - for (GridDhtCacheEntry<K, V> entry : mapping.getValue()) { - IgniteTxEntry<K, V> txEntry = txMap.get(entry.txKey()); - - if (txEntry != null) { - GridDistributedTxMapping<K, V> m = map.get(n.id()); - - if (m == null) - map.put(n.id(), m = new GridDistributedTxMapping<>(n)); - - m.add(txEntry); - } - } - } - } - - - /** {@inheritDoc} */ - @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int part) { - assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']'; - } - - - /** - * @param msgId Message ID. - * @param e Entry to add. - * @return Future for active transactions for the time when reader was added. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgniteFuture<Boolean> addEntry(long msgId, IgniteTxEntry<K, V> e) throws IgniteCheckedException { - init(); - - IgniteTxState state = state(); - - assert state == ACTIVE || (state == PREPARING && optimistic()) : "Invalid tx state for " + - "adding entry [msgId=" + msgId + ", e=" + e + ", tx=" + this + ']'; - - e.unmarshal(cctx, false, cctx.deploy().globalLoader()); - - checkInternal(e.txKey()); - - state = state(); - - assert state == ACTIVE || (state == PREPARING && optimistic()): "Invalid tx state for adding entry: " + e; - - GridCacheContext<K, V> cacheCtx = e.context(); - - GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); - - try { - IgniteTxEntry<K, V> entry = txMap.get(e.txKey()); - - if (entry != null) { - entry.op(e.op()); // Absolutely must set operation, as default is DELETE. - entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.entryProcessors(e.entryProcessors()); - entry.valueBytes(e.valueBytes()); - entry.ttl(e.ttl()); - entry.filters(e.filters()); - entry.drExpireTime(e.drExpireTime()); - } - else { - entry = e; - - addActiveCache(dhtCache.context()); - - while (true) { - GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(entry.key(), topologyVersion()); - - try { - // Set key bytes to avoid serializing in future. - cached.keyBytes(entry.keyBytes()); - - entry.cached(cached, entry.keyBytes()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when adding to dht tx (will retry): " + cached); - } - } - - GridCacheVersion explicit = entry.explicitVersion(); - - if (explicit != null) { - GridCacheVersion dhtVer = cctx.mvcc().mappedVersion(explicit); - - if (dhtVer == null) - throw new IgniteCheckedException("Failed to find dht mapping for explicit entry version: " + entry); - - entry.explicitVersion(dhtVer); - } - - txMap.put(entry.txKey(), entry); - - if (log.isDebugEnabled()) - log.debug("Added entry to transaction: " + entry); - } - - return addReader(msgId, dhtCache.entryExx(entry.key()), entry, topologyVersion()); - } - catch (GridDhtInvalidPartitionException ex) { - addInvalidPartition(cacheCtx, ex.partition()); - - return new GridFinishedFuture<>(cctx.kernalContext(), true); - } - } - - /** - * @param cacheCtx Cache context. - * @param entries Entries to lock. - * @param writeEntries Write entries for implicit transactions mapped to one node. - * @param onePhaseCommit One phase commit flag. - * @param drVers DR versions. - * @param msgId Message ID. - * @param implicit Implicit flag. - * @param read Read flag. - * @param accessTtl TTL for read operation. - * @return Lock future. - */ - IgniteFuture<GridCacheReturn<V>> lockAllAsync( - GridCacheContext<K, V> cacheCtx, - Collection<GridCacheEntryEx<K, V>> entries, - List<IgniteTxEntry<K, V>> writeEntries, - boolean onePhaseCommit, - GridCacheVersion[] drVers, - long msgId, - boolean implicit, - final boolean read, - long accessTtl - ) { - try { - checkValid(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - - final GridCacheReturn<V> ret = new GridCacheReturn<>(false); - - if (F.isEmpty(entries)) - return new GridFinishedFuture<>(cctx.kernalContext(), ret); - - init(); - - onePhaseCommit(onePhaseCommit); - - try { - assert drVers == null || entries.size() == drVers.length; - - Set<K> skipped = null; - - int idx = 0; - int drVerIdx = 0; - - long topVer = topologyVersion(); - - GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); - - // Enlist locks into transaction. - for (GridCacheEntryEx<K, V> entry : entries) { - K key = entry.key(); - - IgniteTxEntry<K, V> txEntry = entry(entry.txKey()); - - // First time access. - if (txEntry == null) { - GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(key, topVer); - - addActiveCache(dhtCache.context()); - - cached.unswap(!read, read); - - IgniteTxEntry<K, V> - w = writeEntries == null ? null : writeEntries.get(idx++); - - txEntry = addEntry(NOOP, - null, - null, - null, - cached, - null, - CU.<K, V>empty(), - false, - -1L, - -1L, - drVers != null ? drVers[drVerIdx++] : null); - - if (w != null) { - assert key.equals(w.key()) : "Invalid entry [cached=" + cached + ", w=" + w + ']'; - - txEntry.op(w.op()); - txEntry.value(w.value(), w.hasWriteValue(), w.hasReadValue()); - txEntry.valueBytes(w.valueBytes()); - txEntry.drVersion(w.drVersion()); - txEntry.entryProcessors(w.entryProcessors()); - txEntry.ttl(w.ttl()); - txEntry.filters(w.filters()); - txEntry.drExpireTime(w.drExpireTime()); - txEntry.expiry(w.expiry()); - } - else if (read) - txEntry.ttl(accessTtl); - - txEntry.cached(cached, txEntry.keyBytes()); - - addReader(msgId, cached, txEntry, topVer); - } - else { - if (skipped == null) - skipped = new GridLeanSet<>(); - - skipped.add(key); - } - } - - assert pessimistic(); - - Collection<K> keys = F.viewReadOnly(entries, CU.<K, V>entry2Key()); - - // Acquire locks only after having added operation to the write set. - // Otherwise, during rollback we will not know whether locks need - // to be rolled back. - // Loose all skipped and previously locked (we cannot reenter locks here). - final Collection<? extends K> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys; - - if (log.isDebugEnabled()) - log.debug("Lock keys: " + passedKeys); - - return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, accessTtl, null); - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - } - - /** - * @param cacheCtx Context. - * @param ret Return value. - * @param passedKeys Passed keys. - * @param read {@code True} if read. - * @param skipped Skipped keys. - * @param accessTtl TTL for read operation. - * @param filter Entry write filter. - * @return Future for lock acquisition. - */ - private IgniteFuture<GridCacheReturn<V>> obtainLockAsync( - final GridCacheContext<K, V> cacheCtx, - GridCacheReturn<V> ret, - final Collection<? extends K> passedKeys, - final boolean read, - final Set<K> skipped, - final long accessTtl, - @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + - skipped + ']'); - - if (passedKeys.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), ret); - - GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); - - IgniteFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, - lockTimeout(), - this, - isInvalidate(), - read, - /*retval*/false, - isolation, - accessTtl, - CU.<K, V>empty()); - - return new GridEmbeddedFuture<>( - fut, - new PLC1<GridCacheReturn<V>>(ret) { - @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock on keys: " + passedKeys); - - postLockWrite(cacheCtx, - passedKeys, - skipped, - ret, - /*remove*/false, - /*retval*/false, - /*read*/read, - accessTtl, - filter == null ? CU.<K, V>empty() : filter, - /**computeInvoke*/false); - - return ret; - } - }, - cctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Override protected void addGroupTxMapping(Collection<IgniteTxKey<K>> keys) { - assert groupLock(); - - for (GridDistributedTxMapping<K, V> mapping : dhtMap.values()) - mapping.entries(Collections.unmodifiableCollection(txMap.values()), true); - - // Here we know that affinity key for all given keys is our group lock key. - // Just add entries to dht mapping. - // Add near readers. If near cache is disabled on all nodes, do nothing. - Collection<UUID> backupIds = dhtMap.keySet(); - - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> locNearMap = null; - - for (IgniteTxKey<K> key : keys) { - IgniteTxEntry<K, V> txEntry = entry(key); - - if (!txEntry.groupLockEntry() || txEntry.context().isNear()) - continue; - - assert txEntry.cached() instanceof GridDhtCacheEntry : "Invalid entry type: " + txEntry.cached(); - - while (true) { - try { - GridDhtCacheEntry<K, V> entry = (GridDhtCacheEntry<K, V>)txEntry.cached(); - - Collection<UUID> readers = entry.readers(); - - if (!F.isEmpty(readers)) { - Collection<ClusterNode> nearNodes = cctx.discovery().nodes(readers, F0.notEqualTo(nearNodeId()), - F.notIn(backupIds)); - - if (log.isDebugEnabled()) - log.debug("Mapping entry to near nodes [nodes=" + U.nodeIds(nearNodes) + ", entry=" + - entry + ']'); - - for (ClusterNode n : nearNodes) { - if (locNearMap == null) - locNearMap = new HashMap<>(); - - List<GridDhtCacheEntry<K, V>> entries = locNearMap.get(n); - - if (entries == null) - locNearMap.put(n, entries = new LinkedList<>()); - - entries.add(entry); - } - } - - break; - } - catch (GridCacheEntryRemovedException ignored) { - // Retry. - txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), txEntry.keyBytes()); - } - } - } - - if (locNearMap != null) - addNearMapping(locNearMap); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean finish(boolean commit) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]"); - - if (optimistic()) - state(PREPARED); - - if (commit) { - if (!state(COMMITTING)) { - IgniteTxState state = state(); - - if (state != COMMITTING && state != COMMITTED) - throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state() + - ", tx=" + this + ']'); - else { - if (log.isDebugEnabled()) - log.debug("Invalid transaction state for commit (another thread is committing): " + this); - - return false; - } - } - } - else { - if (!state(ROLLING_BACK)) { - if (log.isDebugEnabled()) - log.debug("Invalid transaction state for rollback [state=" + state() + ", tx=" + this + ']'); - - return false; - } - } - - IgniteCheckedException err = null; - - // Commit to DB first. This way if there is a failure, transaction - // won't be committed. - try { - if (commit && !isRollbackOnly()) - userCommit(); - else - userRollback(); - } - catch (IgniteCheckedException e) { - err = e; - - commit = false; - - // If heuristic error. - if (!isRollbackOnly()) { - systemInvalidate(true); - - U.warn(log, "Set transaction invalidation flag to true due to error [tx=" + CU.txString(this) + - ", err=" + err + ']'); - } - } - - if (err != null) { - state(UNKNOWN); - - throw err; - } - else { - // Committed state will be set in finish future onDone callback. - if (commit) { - if (!onePhaseCommit()) { - if (!state(COMMITTED)) { - state(UNKNOWN); - - throw new IgniteCheckedException("Invalid transaction state for commit: " + this); - } - } - } - else { - if (!state(ROLLED_BACK)) { - state(UNKNOWN); - - throw new IgniteCheckedException("Invalid transaction state for rollback: " + this); - } - } - } - - return true; - } - - /** - * Removes previously created prepare future from atomic reference. - * - * @param fut Expected future. - */ - protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> fut); - - /** {@inheritDoc} */ - @Override public void rollback() throws IgniteCheckedException { - try { - rollbackAsync().get(); - } - finally { - cctx.tm().txContextReset(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), - "dhtNodes", dhtMap.keySet(), "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java deleted file mode 100644 index 1e58edd..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed.dht; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * DHT transaction mapping. - */ -public class GridDhtTxMapping<K, V> { - /** Transaction nodes mapping (primary node -> related backup nodes). */ - private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>(); - - /** */ - private final List<TxMapping> mappings = new ArrayList<>(); - - /** */ - private TxMapping last; - - /** - * Adds information about next mapping. - * - * @param nodes Nodes. - */ - @SuppressWarnings("ConstantConditions") - public void addMapping(List<ClusterNode> nodes) { - ClusterNode primary = F.first(nodes); - - Collection<ClusterNode> backups = F.view(nodes, F.notEqualTo(primary)); - - if (last == null || !last.primary.equals(primary.id())) { - last = new TxMapping(primary, backups); - - mappings.add(last); - } - else - last.add(backups); - - Collection<UUID> storedBackups = txNodes.get(last.primary); - - if (storedBackups == null) - txNodes.put(last.primary, storedBackups = new HashSet<>()); - - storedBackups.addAll(last.backups); - } - - /** - * @return Primary to backup mapping. - */ - public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - - /** - * For each mapping sets flags indicating if mapping is last for node. - * - * @param mappings Mappings. - */ - public void initLast(Collection<GridDistributedTxMapping<K, V>> mappings) { - assert this.mappings.size() == mappings.size(); - - int idx = 0; - - for (GridDistributedTxMapping<?, ?> map : mappings) { - TxMapping mapping = this.mappings.get(idx); - - map.lastBackups(lastBackups(mapping, idx)); - - boolean last = true; - - for (int i = idx + 1; i < this.mappings.size(); i++) { - TxMapping nextMap = this.mappings.get(i); - - if (nextMap.primary.equals(mapping.primary)) { - last = false; - - break; - } - } - - map.last(last); - - idx++; - } - } - - /** - * @param mapping Mapping. - * @param idx Mapping index. - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable private Collection<UUID> lastBackups(TxMapping mapping, int idx) { - Collection<UUID> res = null; - - for (UUID backup : mapping.backups) { - boolean foundNext = false; - - for (int i = idx + 1; i < mappings.size(); i++) { - TxMapping nextMap = mappings.get(i); - - if (nextMap.primary.equals(mapping.primary) && nextMap.backups.contains(backup)) { - foundNext = true; - - break; - } - } - - if (!foundNext) { - if (res == null) - res = new ArrayList<>(mapping.backups.size()); - - res.add(backup); - } - } - - return res; - } - - /** - */ - private static class TxMapping { - /** */ - private final UUID primary; - - /** */ - private final Set<UUID> backups; - - /** - * @param primary Primary node. - * @param backups Backup nodes. - */ - private TxMapping(ClusterNode primary, Iterable<ClusterNode> backups) { - this.primary = primary.id(); - - this.backups = new HashSet<>(); - - add(backups); - } - - /** - * @param backups Backup nodes. - */ - private void add(Iterable<ClusterNode> backups) { - for (ClusterNode n : backups) - this.backups.add(n.id()); - } - } -}