http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishResponse.java deleted file mode 100644 index 3cd698d..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; - -import java.io.*; -import java.nio.*; - -/** - * Transaction finish response. - */ -public class GridDistributedTxFinishResponse<K, V> extends GridCacheMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridCacheVersion txId; - - /** Future ID. */ - private IgniteUuid futId; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDistributedTxFinishResponse() { - /* No-op. */ - } - - /** - * @param txId Transaction id. - * @param futId Future ID. - */ - public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) { - assert txId != null; - assert futId != null; - - this.txId = txId; - this.futId = futId; - } - - /** - * - * @return Transaction id. - */ - public GridCacheVersion xid() { - return txId; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", - "OverriddenMethodCallDuringObjectConstruction"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDistributedTxFinishResponse _clone = new GridDistributedTxFinishResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDistributedTxFinishResponse _clone = (GridDistributedTxFinishResponse)_msg; - - _clone.txId = txId; - _clone.futId = futId; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 3: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 4: - if (!commState.putCacheVersion(txId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 3: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 4: - GridCacheVersion txId0 = commState.getCacheVersion(); - - if (txId0 == CACHE_VER_NOT_READ) - return false; - - txId = txId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 25; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDistributedTxFinishResponse.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxMapping.java deleted file mode 100644 index 4ab3a11..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxMapping.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Transaction node mapping. - */ -public class GridDistributedTxMapping<K, V> implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapped node. */ - @GridToStringExclude - private ClusterNode node; - - /** Entries. */ - @GridToStringInclude - private Collection<IgniteTxEntry<K, V>> entries; - - /** Explicit lock flag. */ - private boolean explicitLock; - - /** DHT version. */ - private GridCacheVersion dhtVer; - - /** Copy on remove flag. */ - private boolean readOnly; - - /** {@code True} if this is last mapping for node. */ - private boolean last; - - /** IDs of backup nodes receiving last prepare request during this mapping. */ - private Collection<UUID> lastBackups; - - /** {@code True} if mapping is for near caches, {@code false} otherwise. */ - private boolean near; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDistributedTxMapping() { - // No-op. - } - - /** - * @param node Mapped node. - */ - public GridDistributedTxMapping(ClusterNode node) { - this.node = node; - - entries = new ConcurrentLinkedQueue<>(); - } - - /** - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable public Collection<UUID> lastBackups() { - return lastBackups; - } - - /** - * @param lastBackups IDs of backup nodes receiving last prepare request during this mapping. - */ - public void lastBackups(@Nullable Collection<UUID> lastBackups) { - this.lastBackups = lastBackups; - } - - /** - * @return {@code True} if this is last mapping for node. - */ - public boolean last() { - return last; - } - - /** - * @param last If {@code True} this is last mapping for node. - */ - public void last(boolean last) { - this.last = last; - } - - /** - * @return {@code True} if mapping is for near caches, {@code false} otherwise. - */ - public boolean near() { - return near; - } - - /** - * @param near {@code True} if mapping is for near caches, {@code false} otherwise. - */ - public void near(boolean near) { - this.near = near; - } - - /** - * @return Node. - */ - public ClusterNode node() { - return node; - } - - /** - * @return Entries. - */ - public Collection<IgniteTxEntry<K, V>> entries() { - return entries; - } - - /** - * @param entries Mapped entries. - * @param readOnly Flag indicating that passed in collection is read-only. - */ - public void entries(Collection<IgniteTxEntry<K, V>> entries, boolean readOnly) { - this.entries = entries; - - // Set copy on remove flag as passed in collection is unmodifiable. - this.readOnly = true; - } - - /** - * @return {@code True} if lock is explicit. - */ - public boolean explicitLock() { - return explicitLock; - } - - /** - * Sets explicit flag to {@code true}. - */ - public void markExplicitLock() { - explicitLock = true; - } - - /** - * @return DHT version. - */ - public GridCacheVersion dhtVersion() { - return dhtVer; - } - - /** - * @param dhtVer DHT version. - */ - public void dhtVersion(GridCacheVersion dhtVer) { - this.dhtVer = dhtVer; - - for (IgniteTxEntry<K, V> e : entries) - e.dhtVersion(dhtVer); - } - - /** - * @return Reads. - */ - public Collection<IgniteTxEntry<K, V>> reads() { - return F.view(entries, CU.<K, V>reads()); - } - - /** - * @return Writes. - */ - public Collection<IgniteTxEntry<K, V>> writes() { - return F.view(entries, CU.<K, V>writes()); - } - - /** - * @param entry Adds entry. - */ - public void add(IgniteTxEntry<K, V> entry) { - ensureModifiable(); - - entries.add(entry); - } - - /** - * @param entry Entry to remove. - * @return {@code True} if entry was removed. - */ - public boolean removeEntry(IgniteTxEntry<K, V> entry) { - ensureModifiable(); - - return entries.remove(entry); - } - - /** - * @param parts Evicts partitions from mapping. - */ - public void evictPartitions(@Nullable int[] parts) { - if (!F.isEmpty(parts)) { - ensureModifiable(); - - evictPartitions(parts, entries); - } - } - - /** - * @param parts Partitions. - * @param c Collection. - */ - private void evictPartitions(int[] parts, Collection<IgniteTxEntry<K, V>> c) { - assert parts != null; - - for (Iterator<IgniteTxEntry<K, V>> it = c.iterator(); it.hasNext();) { - IgniteTxEntry<K, V> e = it.next(); - - GridCacheEntryEx<K,V> cached = e.cached(); - - if (U.containsIntArray(parts, cached.partition())) - it.remove(); - } - } - - /** - * @param keys Keys to evict readers for. - */ - public void evictReaders(@Nullable Collection<IgniteTxKey<K>> keys) { - if (keys == null || keys.isEmpty()) - return; - - ensureModifiable(); - - evictReaders(keys, entries); - } - - /** - * @param keys Keys to evict readers for. - * @param entries Entries to check. - */ - private void evictReaders(Collection<IgniteTxKey<K>> keys, @Nullable Collection<IgniteTxEntry<K, V>> entries) { - if (entries == null || entries.isEmpty()) - return; - - for (Iterator<IgniteTxEntry<K, V>> it = entries.iterator(); it.hasNext();) { - IgniteTxEntry<K, V> entry = it.next(); - - if (keys.contains(entry.txKey())) - it.remove(); - } - } - - /** - * Copies collection of entries if it is read-only. - */ - private void ensureModifiable() { - if (readOnly) { - entries = new ConcurrentLinkedQueue<>(entries); - - readOnly = false; - } - } - - /** {@inheritDoc} */ - public boolean empty() { - return entries.isEmpty(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(node); - - U.writeCollection(out, entries); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - node = (ClusterNode)in.readObject(); - - entries = U.readCollection(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDistributedTxMapping.class, this, "node", node.id()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java deleted file mode 100644 index fae775a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ /dev/null @@ -1,776 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Transaction prepare request for optimistic and eventually consistent - * transactions. - */ -public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Thread ID. */ - @GridToStringInclude - private long threadId; - - /** Transaction concurrency. */ - @GridToStringInclude - private IgniteTxConcurrency concurrency; - - /** Transaction isolation. */ - @GridToStringInclude - private IgniteTxIsolation isolation; - - /** Commit version for EC transactions. */ - @GridToStringInclude - private GridCacheVersion commitVer; - - /** Transaction timeout. */ - @GridToStringInclude - private long timeout; - - /** Invalidation flag. */ - @GridToStringInclude - private boolean invalidate; - - /** Transaction read set. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> reads; - - /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> readsBytes; - - /** Transaction write entries. */ - @GridToStringInclude - @GridDirectTransient - private Collection<IgniteTxEntry<K, V>> writes; - - /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> writesBytes; - - /** DHT versions to verify. */ - @GridToStringInclude - @GridDirectTransient - private Map<IgniteTxKey<K>, GridCacheVersion> dhtVers; - - /** Serialized map. */ - @GridToStringExclude - private byte[] dhtVersBytes; - - /** Group lock key, if any. */ - @GridToStringInclude - @GridDirectTransient - private IgniteTxKey grpLockKey; - - /** Group lock key bytes. */ - @GridToStringExclude - private byte[] grpLockKeyBytes; - - /** Partition lock flag. */ - private boolean partLock; - - /** Expected transaction size. */ - private int txSize; - - /** Transaction nodes mapping (primary node -> related backup nodes). */ - @GridDirectTransient - private Map<UUID, Collection<UUID>> txNodes; - - /** */ - private byte[] txNodesBytes; - - /** System flag. */ - private boolean sys; - - /** - * Required by {@link Externalizable}. - */ - public GridDistributedTxPrepareRequest() { - /* No-op. */ - } - - /** - * @param tx Cache transaction. - * @param reads Read entries. - * @param writes Write entries. - * @param grpLockKey Group lock key. - * @param partLock {@code True} if preparing group-lock transaction with partition lock. - * @param txNodes Transaction nodes mapping. - */ - public GridDistributedTxPrepareRequest( - IgniteTxEx<K, V> tx, - @Nullable Collection<IgniteTxEntry<K, V>> reads, - Collection<IgniteTxEntry<K, V>> writes, - IgniteTxKey grpLockKey, - boolean partLock, - Map<UUID, Collection<UUID>> txNodes - ) { - super(tx.xidVersion(), 0); - - commitVer = null; - threadId = tx.threadId(); - concurrency = tx.concurrency(); - isolation = tx.isolation(); - timeout = tx.timeout(); - invalidate = tx.isInvalidate(); - txSize = tx.size(); - sys = tx.system(); - - this.reads = reads; - this.writes = writes; - this.grpLockKey = grpLockKey; - this.partLock = partLock; - this.txNodes = txNodes; - } - - /** - * @return Transaction nodes mapping. - */ - public Map<UUID, Collection<UUID>> transactionNodes() { - return txNodes; - } - - /** - * @return System flag. - */ - public boolean system() { - return sys; - } - - /** - * Adds version to be verified on remote node. - * - * @param key Key for which version is verified. - * @param dhtVer DHT version to check. - */ - public void addDhtVersion(IgniteTxKey<K> key, @Nullable GridCacheVersion dhtVer) { - if (dhtVers == null) - dhtVers = new HashMap<>(); - - dhtVers.put(key, dhtVer); - } - - /** - * @return Map of versions to be verified. - */ - public Map<IgniteTxKey<K>, GridCacheVersion> dhtVersions() { - return dhtVers == null ? Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap() : dhtVers; - } - - /** - * @return Thread ID. - */ - public long threadId() { - return threadId; - } - - /** - * @return Commit version. - */ - public GridCacheVersion commitVersion() { return commitVer; } - - /** - * @return Invalidate flag. - */ - public boolean isInvalidate() { return invalidate; } - - /** - * @return Transaction timeout. - */ - public long timeout() { - return timeout; - } - - /** - * @return Concurrency. - */ - public IgniteTxConcurrency concurrency() { - return concurrency; - } - - /** - * @return Isolation level. - */ - public IgniteTxIsolation isolation() { - return isolation; - } - - /** - * @return Read set. - */ - public Collection<IgniteTxEntry<K, V>> reads() { - return reads; - } - - /** - * @return Write entries. - */ - public Collection<IgniteTxEntry<K, V>> writes() { - return writes; - } - - /** - * @param reads Reads. - */ - protected void reads(Collection<IgniteTxEntry<K, V>> reads) { - this.reads = reads; - } - - /** - * @param writes Writes. - */ - protected void writes(Collection<IgniteTxEntry<K, V>> writes) { - this.writes = writes; - } - - /** - * @return Group lock key if preparing group-lock transaction. - */ - @Nullable public IgniteTxKey groupLockKey() { - return grpLockKey; - } - - /** - * @return {@code True} if preparing group-lock transaction with partition lock. - */ - public boolean partitionLock() { - return partLock; - } - - /** - * @return Expected transaction size. - */ - public int txSize() { - return txSize; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (writes != null) { - marshalTx(writes, ctx); - - writesBytes = new ArrayList<>(writes.size()); - - for (IgniteTxEntry<K, V> e : writes) - writesBytes.add(ctx.marshaller().marshal(e)); - } - - if (reads != null) { - marshalTx(reads, ctx); - - readsBytes = new ArrayList<>(reads.size()); - - for (IgniteTxEntry<K, V> e : reads) - readsBytes.add(ctx.marshaller().marshal(e)); - } - - if (grpLockKey != null && grpLockKeyBytes == null) - grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey); - - if (dhtVers != null && dhtVersBytes == null) - dhtVersBytes = ctx.marshaller().marshal(dhtVers); - - if (txNodes != null) - txNodesBytes = ctx.marshaller().marshal(txNodes); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (writesBytes != null) { - writes = new ArrayList<>(writesBytes.size()); - - for (byte[] arr : writesBytes) - writes.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(writes, false, ctx, ldr); - } - - if (readsBytes != null) { - reads = new ArrayList<>(readsBytes.size()); - - for (byte[] arr : readsBytes) - reads.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); - - unmarshalTx(reads, false, ctx, ldr); - } - - if (grpLockKeyBytes != null && grpLockKey == null) - grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); - - if (dhtVersBytes != null && dhtVers == null) - dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr); - - if (txNodesBytes != null) - txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); - } - - /** - * - * @param out Output. - * @param col Set to write. - * @throws IOException If write failed. - */ - private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry<K, V>> col) throws IOException { - boolean empty = F.isEmpty(col); - - if (!empty) { - out.writeInt(col.size()); - - for (IgniteTxEntry<K, V> e : col) { - V val = e.value(); - boolean hasWriteVal = e.hasWriteValue(); - boolean hasReadVal = e.hasReadValue(); - - try { - // Don't serialize value if invalidate is set to true. - if (invalidate) - e.value(null, false, false); - - out.writeObject(e); - } - finally { - // Set original value back. - e.value(val, hasWriteVal, hasReadVal); - } - } - } - else - out.writeInt(-1); - } - - /** - * @param in Input. - * @return Deserialized set. - * @throws IOException If deserialization failed. - * @throws ClassNotFoundException If deserialized class could not be found. - */ - @SuppressWarnings({"unchecked"}) - @Nullable private Collection<IgniteTxEntry<K, V>> readCollection(ObjectInput in) throws IOException, - ClassNotFoundException { - List<IgniteTxEntry<K, V>> col = null; - - int size = in.readInt(); - - // Check null flag. - if (size != -1) { - col = new ArrayList<>(size); - - for (int i = 0; i < size; i++) - col.add((IgniteTxEntry<K, V>)in.readObject()); - } - - return col == null ? Collections.<IgniteTxEntry<K,V>>emptyList() : col; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", - "OverriddenMethodCallDuringObjectConstruction"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDistributedTxPrepareRequest _clone = new GridDistributedTxPrepareRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDistributedTxPrepareRequest _clone = (GridDistributedTxPrepareRequest)_msg; - - _clone.threadId = threadId; - _clone.concurrency = concurrency; - _clone.isolation = isolation; - _clone.commitVer = commitVer; - _clone.timeout = timeout; - _clone.invalidate = invalidate; - _clone.reads = reads; - _clone.readsBytes = readsBytes; - _clone.writes = writes; - _clone.writesBytes = writesBytes; - _clone.dhtVers = dhtVers; - _clone.dhtVersBytes = dhtVersBytes; - _clone.grpLockKey = grpLockKey; - _clone.grpLockKeyBytes = grpLockKeyBytes; - _clone.partLock = partLock; - _clone.txSize = txSize; - _clone.txNodes = txNodes; - _clone.txNodesBytes = txNodesBytes; - _clone.sys = sys; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putCacheVersion(commitVer)) - return false; - - commState.idx++; - - case 9: - if (!commState.putEnum(concurrency)) - return false; - - commState.idx++; - - case 10: - if (!commState.putByteArray(dhtVersBytes)) - return false; - - commState.idx++; - - case 11: - if (!commState.putByteArray(grpLockKeyBytes)) - return false; - - commState.idx++; - - case 12: - if (!commState.putBoolean(invalidate)) - return false; - - commState.idx++; - - case 13: - if (!commState.putEnum(isolation)) - return false; - - commState.idx++; - - case 14: - if (!commState.putBoolean(partLock)) - return false; - - commState.idx++; - - case 15: - if (readsBytes != null) { - if (commState.it == null) { - if (!commState.putInt(readsBytes.size())) - return false; - - commState.it = readsBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 16: - if (!commState.putLong(threadId)) - return false; - - commState.idx++; - - case 17: - if (!commState.putLong(timeout)) - return false; - - commState.idx++; - - case 18: - if (!commState.putByteArray(txNodesBytes)) - return false; - - commState.idx++; - - case 19: - if (!commState.putInt(txSize)) - return false; - - commState.idx++; - - case 20: - if (writesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(writesBytes.size())) - return false; - - commState.it = writesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 21: - if (!commState.putBoolean(sys)) - return false; - - commState.idx++; - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - GridCacheVersion commitVer0 = commState.getCacheVersion(); - - if (commitVer0 == CACHE_VER_NOT_READ) - return false; - - commitVer = commitVer0; - - commState.idx++; - - case 9: - if (buf.remaining() < 1) - return false; - - byte concurrency0 = commState.getByte(); - - concurrency = IgniteTxConcurrency.fromOrdinal(concurrency0); - - commState.idx++; - - case 10: - byte[] dhtVersBytes0 = commState.getByteArray(); - - if (dhtVersBytes0 == BYTE_ARR_NOT_READ) - return false; - - dhtVersBytes = dhtVersBytes0; - - commState.idx++; - - case 11: - byte[] grpLockKeyBytes0 = commState.getByteArray(); - - if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ) - return false; - - grpLockKeyBytes = grpLockKeyBytes0; - - commState.idx++; - - case 12: - if (buf.remaining() < 1) - return false; - - invalidate = commState.getBoolean(); - - commState.idx++; - - case 13: - if (buf.remaining() < 1) - return false; - - byte isolation0 = commState.getByte(); - - isolation = IgniteTxIsolation.fromOrdinal(isolation0); - - commState.idx++; - - case 14: - if (buf.remaining() < 1) - return false; - - partLock = commState.getBoolean(); - - commState.idx++; - - case 15: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (readsBytes == null) - readsBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - readsBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 16: - if (buf.remaining() < 8) - return false; - - threadId = commState.getLong(); - - commState.idx++; - - case 17: - if (buf.remaining() < 8) - return false; - - timeout = commState.getLong(); - - commState.idx++; - - case 18: - byte[] txNodesBytes0 = commState.getByteArray(); - - if (txNodesBytes0 == BYTE_ARR_NOT_READ) - return false; - - txNodesBytes = txNodesBytes0; - - commState.idx++; - - case 19: - if (buf.remaining() < 4) - return false; - - txSize = commState.getInt(); - - commState.idx++; - - case 20: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (writesBytes == null) - writesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - writesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 21: - if (buf.remaining() < 1) - return false; - - sys = commState.getBoolean(); - - commState.idx++; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 26; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this, - "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareResponse.java deleted file mode 100644 index 1cc1ab4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Response to prepare request. - */ -public class GridDistributedTxPrepareResponse<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Collections of local lock candidates. */ - @GridToStringInclude - @GridDirectTransient - private Map<K, Collection<GridCacheMvccCandidate<K>>> cands; - - /** */ - private byte[] candsBytes; - - /** Error. */ - @GridToStringExclude - @GridDirectTransient - private Throwable err; - - /** Serialized error. */ - private byte[] errBytes; - - /** - * Empty constructor (required by {@link Externalizable}). - */ - public GridDistributedTxPrepareResponse() { - /* No-op. */ - } - - /** - * @param xid Transaction ID. - */ - public GridDistributedTxPrepareResponse(GridCacheVersion xid) { - super(xid, 0); - } - - /** - * @param xid Lock ID. - * @param err Error. - */ - public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err) { - super(xid, 0); - - this.err = err; - } - - /** - * @return Error. - */ - public Throwable error() { - return err; - } - - /** - * @param err Error to set. - */ - public void error(Throwable err) { - this.err = err; - } - - /** - * @return Rollback flag. - */ - public boolean isRollback() { - return err != null; - } - - /** - * @param cands Candidates map to set. - */ - public void candidates(Map<K, Collection<GridCacheMvccCandidate<K>>> cands) { - this.cands = cands; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (candsBytes == null && cands != null) { - if (ctx.deploymentEnabled()) { - for (K k : cands.keySet()) - prepareObject(k, ctx); - } - - candsBytes = CU.marshal(ctx, cands); - } - - if (err != null) - errBytes = ctx.marshaller().marshal(err); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (candsBytes != null && cands == null) - cands = ctx.marshaller().unmarshal(candsBytes, ldr); - - if (errBytes != null) - err = ctx.marshaller().unmarshal(errBytes, ldr); - } - - /** - * - * @param key Candidates key. - * @return Collection of lock candidates at given index. - */ - @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesForKey(K key) { - assert key != null; - - if (cands == null) - return null; - - return cands.get(key); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", - "OverriddenMethodCallDuringObjectConstruction"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDistributedTxPrepareResponse _clone = new GridDistributedTxPrepareResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDistributedTxPrepareResponse _clone = (GridDistributedTxPrepareResponse)_msg; - - _clone.cands = cands; - _clone.candsBytes = candsBytes; - _clone.err = err; - _clone.errBytes = errBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (!commState.putByteArray(candsBytes)) - return false; - - commState.idx++; - - case 9: - if (!commState.putByteArray(errBytes)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - byte[] candsBytes0 = commState.getByteArray(); - - if (candsBytes0 == BYTE_ARR_NOT_READ) - return false; - - candsBytes = candsBytes0; - - commState.idx++; - - case 9: - byte[] errBytes0 = commState.getByteArray(); - - if (errBytes0 == BYTE_ARR_NOT_READ) - return false; - - errBytes = errBytes0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 27; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDistributedTxPrepareResponse.class, this, "err", - err == null ? "null" : err.toString(), "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java deleted file mode 100644 index e30c234..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ /dev/null @@ -1,775 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; -import static org.apache.ignite.internal.processors.dr.GridDrType.*; - -/** - * Transaction created by system implicitly on remote nodes. - */ -public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> - implements IgniteTxRemoteEx<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Read set. */ - @GridToStringInclude - protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap; - - /** Write map. */ - @GridToStringInclude - protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap; - - /** Remote thread ID. */ - @GridToStringInclude - private long rmtThreadId; - - /** Explicit versions. */ - @GridToStringInclude - private List<GridCacheVersion> explicitVers; - - /** Started flag. */ - @GridToStringInclude - private boolean started; - - /** {@code True} only if all write entries are locked by this transaction. */ - @GridToStringInclude - private AtomicBoolean commitAllowed = new AtomicBoolean(false); - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDistributedTxRemoteAdapter() { - // No-op. - } - - /** - * @param ctx Cache registry. - * @param nodeId Node ID. - * @param rmtThreadId Remote thread ID. - * @param xidVer XID version. - * @param commitVer Commit version. - * @param sys System flag. - * @param concurrency Concurrency level (should be pessimistic). - * @param isolation Transaction isolation. - * @param invalidate Invalidate flag. - * @param timeout Timeout. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - */ - public GridDistributedTxRemoteAdapter( - GridCacheSharedContext<K, V> ctx, - UUID nodeId, - long rmtThreadId, - GridCacheVersion xidVer, - GridCacheVersion commitVer, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - boolean invalidate, - long timeout, - int txSize, - @Nullable IgniteTxKey grpLockKey, - @Nullable UUID subjId, - int taskNameHash - ) { - super( - ctx, - nodeId, - xidVer, - ctx.versions().last(), - Thread.currentThread().getId(), - sys, - concurrency, - isolation, - timeout, - txSize, - grpLockKey, - subjId, - taskNameHash); - - this.rmtThreadId = rmtThreadId; - this.invalidate = invalidate; - - commitVersion(commitVer); - - // Must set started flag after concurrency and isolation. - started = true; - } - - /** {@inheritDoc} */ - @Override public UUID eventNodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> masterNodeIds() { - return Collections.singleton(nodeId); - } - - /** {@inheritDoc} */ - @Override public UUID originatingNodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public Collection<Integer> activeCacheIds() { - return Collections.emptyList(); - } - - /** - * @return Checks if transaction has no entries. - */ - @Override public boolean empty() { - return readMap.isEmpty() && writeMap.isEmpty(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey<K> key) { - IgniteTxEntry e = writeMap.get(key); - - return e != null && e.op() == DELETE; - } - - /** {@inheritDoc} */ - @Override public void invalidate(boolean invalidate) { - this.invalidate = invalidate; - } - - /** {@inheritDoc} */ - @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() { - return writeMap; - } - - /** {@inheritDoc} */ - @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() { - return readMap; - } - - /** {@inheritDoc} */ - @Override public void seal() { - // No-op. - } - - /** - * Adds group lock key to remote transaction. - * - * @param key Key. - */ - public void groupLockKey(IgniteTxKey key) { - if (grpLockKey == null) - grpLockKey = key; - } - - /** {@inheritDoc} */ - @Override public GridTuple<V> peek(GridCacheContext<K, V> cacheCtx, boolean failFast, K key, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws GridCacheFilterFailedException { - assert false : "Method peek can only be called on user transaction: " + this; - - throw new IllegalStateException("Method peek can only be called on user transaction: " + this); - } - - /** {@inheritDoc} */ - @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) { - IgniteTxEntry<K, V> e = writeMap == null ? null : writeMap.get(key); - - if (e == null) - e = readMap == null ? null : readMap.get(key); - - return e; - } - - /** - * Clears entry from transaction as it never happened. - * - * @param key key to be removed. - */ - public void clearEntry(IgniteTxKey<K> key) { - readMap.remove(key); - writeMap.remove(key); - } - - /** - * @param baseVer Base version. - * @param committedVers Committed versions. - * @param rolledbackVers Rolled back versions. - */ - @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { - if (readMap != null && !readMap.isEmpty()) { - for (IgniteTxEntry<K, V> txEntry : readMap.values()) - doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); - } - - if (writeMap != null && !writeMap.isEmpty()) { - for (IgniteTxEntry<K, V> txEntry : writeMap.values()) - doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); - } - } - - /** - * Adds completed versions to an entry. - * - * @param txEntry Entry. - * @param baseVer Base version for completed versions. - * @param committedVers Completed versions relative to base version. - * @param rolledbackVers Rolled back versions relative to base version. - * @param pendingVers Pending versions. - */ - private void doneRemote(IgniteTxEntry<K, V> txEntry, GridCacheVersion baseVer, - Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, - Collection<GridCacheVersion> pendingVers) { - while (true) { - GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); - - try { - // Handle explicit locks. - GridCacheVersion doneVer = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; - - entry.doneRemote(doneVer, baseVer, pendingVers, committedVers, rolledbackVers, isSystemInvalidate()); - - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null; - - if (log.isDebugEnabled()) - log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']'); - - // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { - try { - if (hasWriteKey(entry.txKey())) { - commitIfLocked(); - - return true; - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to commit remote transaction: " + this, e); - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isStarted() { - return started; - } - - /** - * @return Remote node thread ID. - */ - @Override public long remoteThreadId() { - return rmtThreadId; - } - - /** - * @param e Transaction entry to set. - * @return {@code True} if value was set. - */ - @Override public boolean setWriteValue(IgniteTxEntry<K, V> e) { - checkInternal(e.txKey()); - - IgniteTxEntry<K, V> entry = writeMap.get(e.txKey()); - - if (entry == null) { - IgniteTxEntry<K, V> rmv = readMap.remove(e.txKey()); - - if (rmv != null) { - e.cached(rmv.cached(), rmv.keyBytes()); - - writeMap.put(e.txKey(), e); - } - // If lock is explicit. - else { - e.cached(e.context().cache().entryEx(e.key()), null); - - // explicit lock. - writeMap.put(e.txKey(), e); - } - } - else { - // Copy values. - entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.entryProcessors(e.entryProcessors()); - entry.valueBytes(e.valueBytes()); - entry.op(e.op()); - entry.ttl(e.ttl()); - entry.explicitVersion(e.explicitVersion()); - entry.groupLockEntry(e.groupLockEntry()); - - // DR stuff. - entry.drVersion(e.drVersion()); - entry.drExpireTime(e.drExpireTime()); - } - - addExplicit(e); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean hasWriteKey(IgniteTxKey<K> key) { - return writeMap.containsKey(key); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { - assert false; - return null; - } - - /** {@inheritDoc} */ - @Override public Set<IgniteTxKey<K>> readSet() { - return readMap.keySet(); - } - - /** {@inheritDoc} */ - @Override public Set<IgniteTxKey<K>> writeSet() { - return writeMap.keySet(); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteTxEntry<K, V>> allEntries() { - return F.concat(false, writeEntries(), readEntries()); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteTxEntry<K, V>> writeEntries() { - return writeMap.values(); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteTxEntry<K, V>> readEntries() { - return readMap.values(); - } - - /** - * Prepare phase. - * - * @throws IgniteCheckedException If prepare failed. - */ - @Override public void prepare() throws IgniteCheckedException { - // If another thread is doing prepare or rollback. - if (!state(PREPARING)) { - // In optimistic mode prepare may be called multiple times. - if(state() != PREPARING || !optimistic()) { - if (log.isDebugEnabled()) - log.debug("Invalid transaction state for prepare: " + this); - - return; - } - } - - try { - cctx.tm().prepareTx(this); - - if (pessimistic() || isSystemInvalidate()) - state(PREPARED); - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - throw e; - } - } - - /** - * @throws IgniteCheckedException If commit failed. - */ - @SuppressWarnings({"CatchGenericClass"}) - private void commitIfLocked() throws IgniteCheckedException { - if (state() == COMMITTING) { - for (IgniteTxEntry<K, V> txEntry : writeMap.values()) { - assert txEntry != null : "Missing transaction entry for tx: " + this; - - while (true) { - GridCacheEntryEx<K, V> cacheEntry = txEntry.cached(); - - assert cacheEntry != null : "Missing cached entry for transaction entry: " + txEntry; - - try { - GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; - - // If locks haven't been acquired yet, keep waiting. - if (!txEntry.groupLockEntry() && !cacheEntry.lockedBy(ver)) { - if (log.isDebugEnabled()) - log.debug("Transaction does not own lock for entry (will wait) [entry=" + cacheEntry + - ", tx=" + this + ']'); - - return; - } - - break; // While. - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry while committing (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - // Only one thread gets to commit. - if (commitAllowed.compareAndSet(false, true)) { - IgniteCheckedException err = null; - - if (!F.isEmpty(writeMap)) { - // Register this transaction as completed prior to write-phase to - // ensure proper lock ordering for removed entries. - cctx.tm().addCommittedTx(this); - - long topVer = topologyVersion(); - - // Node that for near transactions we grab all entries. - for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) { - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - boolean replicate = cacheCtx.isDrEnabled(); - - try { - while (true) { - try { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - if (cached == null) - txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()), null); - - if (near() && cacheCtx.dr().receiveEnabled()) { - cached.markObsolete(xidVer); - - break; - } - - GridNearCacheEntry<K, V> nearCached = null; - - if (updateNearCache(cacheCtx, txEntry.key(), topVer)) - nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - - if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) - txEntry.cached().unswap(true, false); - - GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry, - false); - - GridCacheOperation op = res.get1(); - V val = res.get2(); - byte[] valBytes = res.get3(); - - GridCacheVersion explicitVer = txEntry.drVersion(); - - if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) { - // Primary node has left the grid so we have to process conflicts on backups. - if (explicitVer == null) - explicitVer = writeVersion(); // Force write version to be used. - - GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, - txEntry, - explicitVer, - op, - val, - valBytes, - txEntry.ttl(), - txEntry.drExpireTime()); - - if (drRes != null) { - op = drRes.operation(); - val = drRes.value(); - valBytes = drRes.valueBytes(); - - if (drRes.isMerge()) - explicitVer = writeVersion(); - else if (op == NOOP) - txEntry.ttl(-1L); - } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; - } - - if (op == CREATE || op == UPDATE) { - // Invalidate only for near nodes (backups cannot be invalidated). - if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) - cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); - else { - cached.innerSet(this, eventNodeId(), nodeId, val, valBytes, false, false, - txEntry.ttl(), true, true, topVer, txEntry.filters(), - replicate ? DR_BACKUP : DR_NONE, txEntry.drExpireTime(), - near() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); - - // Keep near entry up to date. - if (nearCached != null) { - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = cached.valueBytes(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = cached.rawGet(); - - nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), - cached.ttl(), nodeId); - } - } - } - else if (op == DELETE) { - cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); - - // Keep near entry up to date. - if (nearCached != null) - nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId); - } - else if (op == RELOAD) { - V reloaded = cached.innerReload(CU.<K, V>empty()); - - if (nearCached != null) { - nearCached.innerReload(CU.<K, V>empty()); - - nearCached.updateOrEvict(cached.version(), reloaded, null, - cached.expireTime(), cached.ttl(), nodeId); - } - } - else if (op == READ) { - assert near(); - - if (log.isDebugEnabled()) - log.debug("Ignoring READ entry when committing: " + txEntry); - } - // No-op. - else { - assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()): - "Transaction does not own lock for group lock entry during commit [tx=" + - this + ", txEntry=" + txEntry + ']'; - - if (txEntry.ttl() != -1L) - cached.updateTtl(null, txEntry.ttl()); - - if (nearCached != null) { - V val0 = null; - byte[] valBytes0 = null; - - GridCacheValueBytes valBytesTuple = cached.valueBytes(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = cached.rawGet(); - - nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), - cached.ttl(), nodeId); - } - } - - // Assert after setting values as we want to make sure - // that if we replaced removed entries. - assert - txEntry.op() == READ || onePhaseCommit() || - // If candidate is not there, then lock was explicit - // and we simply allow the commit to proceed. - !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : - "Transaction does not own lock for commit [entry=" + cached + - ", tx=" + this + ']'; - - // Break out of while loop. - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Attempting to commit a removed entry (will retry): " + txEntry); - - // Renew cached entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - catch (Throwable ex) { - state(UNKNOWN); - - // In case of error, we still make the best effort to commit, - // as there is no way to rollback at this point. - err = ex instanceof IgniteCheckedException ? (IgniteCheckedException)ex : - new IgniteCheckedException("Commit produced a runtime exception: " + this, ex); - } - } - } - - if (err != null) { - state(UNKNOWN); - - throw err; - } - - cctx.tm().commitTx(this); - - state(COMMITTED); - } - } - } - - /** {@inheritDoc} */ - @Override public void commit() throws IgniteCheckedException { - if (optimistic()) - state(PREPARED); - - if (!state(COMMITTING)) { - IgniteTxState state = state(); - - // If other thread is doing commit, then no-op. - if (state == COMMITTING || state == COMMITTED) - return; - - if (log.isDebugEnabled()) - log.debug("Failed to set COMMITTING transaction state (will rollback): " + this); - - setRollbackOnly(); - - if (!isSystemInvalidate()) - throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); - - rollback(); - } - - commitIfLocked(); - } - - /** - * Forces commit for this tx. - * - * @throws IgniteCheckedException If commit failed. - */ - public void forceCommit() throws IgniteCheckedException { - commitIfLocked(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTx> commitAsync() { - try { - commit(); - - return new GridFinishedFutureEx<IgniteTx>(this); - } - catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(e); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CatchGenericClass"}) - @Override public void rollback() { - try { - // Note that we don't evict near entries here - - // they will be deleted by their corresponding transactions. - if (state(ROLLING_BACK)) { - cctx.tm().rollbackTx(this); - - state(ROLLED_BACK); - } - } - catch (RuntimeException | Error e) { - state(UNKNOWN); - - throw e; - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<IgniteTx> rollbackAsync() { - rollback(); - - return new GridFinishedFutureEx<IgniteTx>(this); - } - - /** {@inheritDoc} */ - @Override public Collection<GridCacheVersion> alternateVersions() { - return explicitVers == null ? Collections.<GridCacheVersion>emptyList() : explicitVers; - } - - /** - * Adds explicit version if there is one. - * - * @param e Transaction entry. - */ - protected void addExplicit(IgniteTxEntry<K, V> e) { - if (e.explicitVersion() != null) { - if (explicitVers == null) - explicitVers = new LinkedList<>(); - - if (!explicitVers.contains(e.explicitVersion())) { - explicitVers.add(e.explicitVersion()); - - if (log.isDebugEnabled()) - log.debug("Added explicit version to transaction [explicitVer=" + e.explicitVersion() + - ", tx=" + this + ']'); - - // Register alternate version with TM. - cctx.tm().addAlternateVersion(e.explicitVersion(), this); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedUnlockRequest.java deleted file mode 100644 index d4f1957..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedUnlockRequest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Lock request message. - */ -public class GridDistributedUnlockRequest<K, V> extends GridDistributedBaseMessage<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Keys to unlock. */ - @GridDirectCollection(byte[].class) - private List<byte[]> keyBytes; - - /** Keys. */ - @GridDirectTransient - private List<K> keys; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDistributedUnlockRequest() { - /* No-op. */ - } - - /** - * @param cacheId Cache ID. - * @param keyCnt Key count. - */ - public GridDistributedUnlockRequest(int cacheId, int keyCnt) { - super(keyCnt); - - this.cacheId = cacheId; - } - - /** - * @return Key to lock. - */ - public List<byte[]> keyBytes() { - return keyBytes; - } - - /** - * @return Keys. - */ - public List<K> keys() { - return keys; - } - - /** - * @param key Key. - * @param bytes Key bytes. - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void addKey(K key, byte[] bytes, GridCacheContext<K, V> ctx) throws IgniteCheckedException { - boolean depEnabled = ctx.deploymentEnabled(); - - if (depEnabled) - prepareObject(key, ctx.shared()); - - if (keys == null) - keys = new ArrayList<>(keysCount()); - - keys.add(key); - - if (keyBytes == null) - keyBytes = new ArrayList<>(keysCount()); - - keyBytes.add(bytes); - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (F.isEmpty(keyBytes) && !F.isEmpty(keys)) - keyBytes = marshalCollection(keys, ctx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (keys == null && !F.isEmpty(keyBytes)) - keys = unmarshalCollection(keyBytes, ctx, ldr); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", - "OverriddenMethodCallDuringObjectConstruction"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDistributedUnlockRequest _clone = new GridDistributedUnlockRequest(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridDistributedUnlockRequest _clone = (GridDistributedUnlockRequest)_msg; - - _clone.keyBytes = keyBytes; - _clone.keys = keys; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 8: - if (keyBytes != null) { - if (commState.it == null) { - if (!commState.putInt(keyBytes.size())) - return false; - - commState.it = keyBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 8: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (keyBytes == null) - keyBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - keyBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 28; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDistributedUnlockRequest.class, this, "keyBytesSize", - keyBytes == null ? 0 : keyBytes.size(), "super", super.toString()); - } -}