http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java new file mode 100644 index 0000000..19470db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -0,0 +1,776 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Transaction prepare request for optimistic and eventually consistent + * transactions. + */ +public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Thread ID. */ + @GridToStringInclude + private long threadId; + + /** Transaction concurrency. */ + @GridToStringInclude + private IgniteTxConcurrency concurrency; + + /** Transaction isolation. */ + @GridToStringInclude + private IgniteTxIsolation isolation; + + /** Commit version for EC transactions. */ + @GridToStringInclude + private GridCacheVersion commitVer; + + /** Transaction timeout. */ + @GridToStringInclude + private long timeout; + + /** Invalidation flag. */ + @GridToStringInclude + private boolean invalidate; + + /** Transaction read set. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxEntry<K, V>> reads; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> readsBytes; + + /** Transaction write entries. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxEntry<K, V>> writes; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> writesBytes; + + /** DHT versions to verify. */ + @GridToStringInclude + @GridDirectTransient + private Map<IgniteTxKey<K>, GridCacheVersion> dhtVers; + + /** Serialized map. */ + @GridToStringExclude + private byte[] dhtVersBytes; + + /** Group lock key, if any. */ + @GridToStringInclude + @GridDirectTransient + private IgniteTxKey grpLockKey; + + /** Group lock key bytes. */ + @GridToStringExclude + private byte[] grpLockKeyBytes; + + /** Partition lock flag. */ + private boolean partLock; + + /** Expected transaction size. */ + private int txSize; + + /** Transaction nodes mapping (primary node -> related backup nodes). */ + @GridDirectTransient + private Map<UUID, Collection<UUID>> txNodes; + + /** */ + private byte[] txNodesBytes; + + /** System flag. */ + private boolean sys; + + /** + * Required by {@link Externalizable}. + */ + public GridDistributedTxPrepareRequest() { + /* No-op. */ + } + + /** + * @param tx Cache transaction. + * @param reads Read entries. + * @param writes Write entries. + * @param grpLockKey Group lock key. + * @param partLock {@code True} if preparing group-lock transaction with partition lock. + * @param txNodes Transaction nodes mapping. + */ + public GridDistributedTxPrepareRequest( + IgniteTxEx<K, V> tx, + @Nullable Collection<IgniteTxEntry<K, V>> reads, + Collection<IgniteTxEntry<K, V>> writes, + IgniteTxKey grpLockKey, + boolean partLock, + Map<UUID, Collection<UUID>> txNodes + ) { + super(tx.xidVersion(), 0); + + commitVer = null; + threadId = tx.threadId(); + concurrency = tx.concurrency(); + isolation = tx.isolation(); + timeout = tx.timeout(); + invalidate = tx.isInvalidate(); + txSize = tx.size(); + sys = tx.system(); + + this.reads = reads; + this.writes = writes; + this.grpLockKey = grpLockKey; + this.partLock = partLock; + this.txNodes = txNodes; + } + + /** + * @return Transaction nodes mapping. + */ + public Map<UUID, Collection<UUID>> transactionNodes() { + return txNodes; + } + + /** + * @return System flag. + */ + public boolean system() { + return sys; + } + + /** + * Adds version to be verified on remote node. + * + * @param key Key for which version is verified. + * @param dhtVer DHT version to check. + */ + public void addDhtVersion(IgniteTxKey<K> key, @Nullable GridCacheVersion dhtVer) { + if (dhtVers == null) + dhtVers = new HashMap<>(); + + dhtVers.put(key, dhtVer); + } + + /** + * @return Map of versions to be verified. + */ + public Map<IgniteTxKey<K>, GridCacheVersion> dhtVersions() { + return dhtVers == null ? Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap() : dhtVers; + } + + /** + * @return Thread ID. + */ + public long threadId() { + return threadId; + } + + /** + * @return Commit version. + */ + public GridCacheVersion commitVersion() { return commitVer; } + + /** + * @return Invalidate flag. + */ + public boolean isInvalidate() { return invalidate; } + + /** + * @return Transaction timeout. + */ + public long timeout() { + return timeout; + } + + /** + * @return Concurrency. + */ + public IgniteTxConcurrency concurrency() { + return concurrency; + } + + /** + * @return Isolation level. + */ + public IgniteTxIsolation isolation() { + return isolation; + } + + /** + * @return Read set. + */ + public Collection<IgniteTxEntry<K, V>> reads() { + return reads; + } + + /** + * @return Write entries. + */ + public Collection<IgniteTxEntry<K, V>> writes() { + return writes; + } + + /** + * @param reads Reads. + */ + protected void reads(Collection<IgniteTxEntry<K, V>> reads) { + this.reads = reads; + } + + /** + * @param writes Writes. + */ + protected void writes(Collection<IgniteTxEntry<K, V>> writes) { + this.writes = writes; + } + + /** + * @return Group lock key if preparing group-lock transaction. + */ + @Nullable public IgniteTxKey groupLockKey() { + return grpLockKey; + } + + /** + * @return {@code True} if preparing group-lock transaction with partition lock. + */ + public boolean partitionLock() { + return partLock; + } + + /** + * @return Expected transaction size. + */ + public int txSize() { + return txSize; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (writes != null) { + marshalTx(writes, ctx); + + writesBytes = new ArrayList<>(writes.size()); + + for (IgniteTxEntry<K, V> e : writes) + writesBytes.add(ctx.marshaller().marshal(e)); + } + + if (reads != null) { + marshalTx(reads, ctx); + + readsBytes = new ArrayList<>(reads.size()); + + for (IgniteTxEntry<K, V> e : reads) + readsBytes.add(ctx.marshaller().marshal(e)); + } + + if (grpLockKey != null && grpLockKeyBytes == null) + grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey); + + if (dhtVers != null && dhtVersBytes == null) + dhtVersBytes = ctx.marshaller().marshal(dhtVers); + + if (txNodes != null) + txNodesBytes = ctx.marshaller().marshal(txNodes); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (writesBytes != null) { + writes = new ArrayList<>(writesBytes.size()); + + for (byte[] arr : writesBytes) + writes.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); + + unmarshalTx(writes, false, ctx, ldr); + } + + if (readsBytes != null) { + reads = new ArrayList<>(readsBytes.size()); + + for (byte[] arr : readsBytes) + reads.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); + + unmarshalTx(reads, false, ctx, ldr); + } + + if (grpLockKeyBytes != null && grpLockKey == null) + grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); + + if (dhtVersBytes != null && dhtVers == null) + dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr); + + if (txNodesBytes != null) + txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); + } + + /** + * + * @param out Output. + * @param col Set to write. + * @throws IOException If write failed. + */ + private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry<K, V>> col) throws IOException { + boolean empty = F.isEmpty(col); + + if (!empty) { + out.writeInt(col.size()); + + for (IgniteTxEntry<K, V> e : col) { + V val = e.value(); + boolean hasWriteVal = e.hasWriteValue(); + boolean hasReadVal = e.hasReadValue(); + + try { + // Don't serialize value if invalidate is set to true. + if (invalidate) + e.value(null, false, false); + + out.writeObject(e); + } + finally { + // Set original value back. + e.value(val, hasWriteVal, hasReadVal); + } + } + } + else + out.writeInt(-1); + } + + /** + * @param in Input. + * @return Deserialized set. + * @throws IOException If deserialization failed. + * @throws ClassNotFoundException If deserialized class could not be found. + */ + @SuppressWarnings({"unchecked"}) + @Nullable private Collection<IgniteTxEntry<K, V>> readCollection(ObjectInput in) throws IOException, + ClassNotFoundException { + List<IgniteTxEntry<K, V>> col = null; + + int size = in.readInt(); + + // Check null flag. + if (size != -1) { + col = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + col.add((IgniteTxEntry<K, V>)in.readObject()); + } + + return col == null ? Collections.<IgniteTxEntry<K,V>>emptyList() : col; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedTxPrepareRequest _clone = new GridDistributedTxPrepareRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedTxPrepareRequest _clone = (GridDistributedTxPrepareRequest)_msg; + + _clone.threadId = threadId; + _clone.concurrency = concurrency; + _clone.isolation = isolation; + _clone.commitVer = commitVer; + _clone.timeout = timeout; + _clone.invalidate = invalidate; + _clone.reads = reads; + _clone.readsBytes = readsBytes; + _clone.writes = writes; + _clone.writesBytes = writesBytes; + _clone.dhtVers = dhtVers; + _clone.dhtVersBytes = dhtVersBytes; + _clone.grpLockKey = grpLockKey; + _clone.grpLockKeyBytes = grpLockKeyBytes; + _clone.partLock = partLock; + _clone.txSize = txSize; + _clone.txNodes = txNodes; + _clone.txNodesBytes = txNodesBytes; + _clone.sys = sys; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (!commState.putCacheVersion(commitVer)) + return false; + + commState.idx++; + + case 9: + if (!commState.putEnum(concurrency)) + return false; + + commState.idx++; + + case 10: + if (!commState.putByteArray(dhtVersBytes)) + return false; + + commState.idx++; + + case 11: + if (!commState.putByteArray(grpLockKeyBytes)) + return false; + + commState.idx++; + + case 12: + if (!commState.putBoolean(invalidate)) + return false; + + commState.idx++; + + case 13: + if (!commState.putEnum(isolation)) + return false; + + commState.idx++; + + case 14: + if (!commState.putBoolean(partLock)) + return false; + + commState.idx++; + + case 15: + if (readsBytes != null) { + if (commState.it == null) { + if (!commState.putInt(readsBytes.size())) + return false; + + commState.it = readsBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 16: + if (!commState.putLong(threadId)) + return false; + + commState.idx++; + + case 17: + if (!commState.putLong(timeout)) + return false; + + commState.idx++; + + case 18: + if (!commState.putByteArray(txNodesBytes)) + return false; + + commState.idx++; + + case 19: + if (!commState.putInt(txSize)) + return false; + + commState.idx++; + + case 20: + if (writesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(writesBytes.size())) + return false; + + commState.it = writesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 21: + if (!commState.putBoolean(sys)) + return false; + + commState.idx++; + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + GridCacheVersion commitVer0 = commState.getCacheVersion(); + + if (commitVer0 == CACHE_VER_NOT_READ) + return false; + + commitVer = commitVer0; + + commState.idx++; + + case 9: + if (buf.remaining() < 1) + return false; + + byte concurrency0 = commState.getByte(); + + concurrency = IgniteTxConcurrency.fromOrdinal(concurrency0); + + commState.idx++; + + case 10: + byte[] dhtVersBytes0 = commState.getByteArray(); + + if (dhtVersBytes0 == BYTE_ARR_NOT_READ) + return false; + + dhtVersBytes = dhtVersBytes0; + + commState.idx++; + + case 11: + byte[] grpLockKeyBytes0 = commState.getByteArray(); + + if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ) + return false; + + grpLockKeyBytes = grpLockKeyBytes0; + + commState.idx++; + + case 12: + if (buf.remaining() < 1) + return false; + + invalidate = commState.getBoolean(); + + commState.idx++; + + case 13: + if (buf.remaining() < 1) + return false; + + byte isolation0 = commState.getByte(); + + isolation = IgniteTxIsolation.fromOrdinal(isolation0); + + commState.idx++; + + case 14: + if (buf.remaining() < 1) + return false; + + partLock = commState.getBoolean(); + + commState.idx++; + + case 15: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (readsBytes == null) + readsBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + readsBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 16: + if (buf.remaining() < 8) + return false; + + threadId = commState.getLong(); + + commState.idx++; + + case 17: + if (buf.remaining() < 8) + return false; + + timeout = commState.getLong(); + + commState.idx++; + + case 18: + byte[] txNodesBytes0 = commState.getByteArray(); + + if (txNodesBytes0 == BYTE_ARR_NOT_READ) + return false; + + txNodesBytes = txNodesBytes0; + + commState.idx++; + + case 19: + if (buf.remaining() < 4) + return false; + + txSize = commState.getInt(); + + commState.idx++; + + case 20: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (writesBytes == null) + writesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + writesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 21: + if (buf.remaining() < 1) + return false; + + sys = commState.getBoolean(); + + commState.idx++; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 26; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this, + "super", super.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java new file mode 100644 index 0000000..e809846 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Response to prepare request. + */ +public class GridDistributedTxPrepareResponse<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Collections of local lock candidates. */ + @GridToStringInclude + @GridDirectTransient + private Map<K, Collection<GridCacheMvccCandidate<K>>> cands; + + /** */ + private byte[] candsBytes; + + /** Error. */ + @GridToStringExclude + @GridDirectTransient + private Throwable err; + + /** Serialized error. */ + private byte[] errBytes; + + /** + * Empty constructor (required by {@link Externalizable}). + */ + public GridDistributedTxPrepareResponse() { + /* No-op. */ + } + + /** + * @param xid Transaction ID. + */ + public GridDistributedTxPrepareResponse(GridCacheVersion xid) { + super(xid, 0); + } + + /** + * @param xid Lock ID. + * @param err Error. + */ + public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err) { + super(xid, 0); + + this.err = err; + } + + /** + * @return Error. + */ + public Throwable error() { + return err; + } + + /** + * @param err Error to set. + */ + public void error(Throwable err) { + this.err = err; + } + + /** + * @return Rollback flag. + */ + public boolean isRollback() { + return err != null; + } + + /** + * @param cands Candidates map to set. + */ + public void candidates(Map<K, Collection<GridCacheMvccCandidate<K>>> cands) { + this.cands = cands; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (candsBytes == null && cands != null) { + if (ctx.deploymentEnabled()) { + for (K k : cands.keySet()) + prepareObject(k, ctx); + } + + candsBytes = CU.marshal(ctx, cands); + } + + if (err != null) + errBytes = ctx.marshaller().marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (candsBytes != null && cands == null) + cands = ctx.marshaller().unmarshal(candsBytes, ldr); + + if (errBytes != null) + err = ctx.marshaller().unmarshal(errBytes, ldr); + } + + /** + * + * @param key Candidates key. + * @return Collection of lock candidates at given index. + */ + @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesForKey(K key) { + assert key != null; + + if (cands == null) + return null; + + return cands.get(key); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedTxPrepareResponse _clone = new GridDistributedTxPrepareResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedTxPrepareResponse _clone = (GridDistributedTxPrepareResponse)_msg; + + _clone.cands = cands; + _clone.candsBytes = candsBytes; + _clone.err = err; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (!commState.putByteArray(candsBytes)) + return false; + + commState.idx++; + + case 9: + if (!commState.putByteArray(errBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + byte[] candsBytes0 = commState.getByteArray(); + + if (candsBytes0 == BYTE_ARR_NOT_READ) + return false; + + candsBytes = candsBytes0; + + commState.idx++; + + case 9: + byte[] errBytes0 = commState.getByteArray(); + + if (errBytes0 == BYTE_ARR_NOT_READ) + return false; + + errBytes = errBytes0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 27; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDistributedTxPrepareResponse.class, this, "err", + err == null ? "null" : err.toString(), "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java new file mode 100644 index 0000000..424f543 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxState.*; +import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; +import static org.apache.ignite.internal.processors.dr.GridDrType.*; + +/** + * Transaction created by system implicitly on remote nodes. + */ +public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> + implements IgniteTxRemoteEx<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Read set. */ + @GridToStringInclude + protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap; + + /** Write map. */ + @GridToStringInclude + protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap; + + /** Remote thread ID. */ + @GridToStringInclude + private long rmtThreadId; + + /** Explicit versions. */ + @GridToStringInclude + private List<GridCacheVersion> explicitVers; + + /** Started flag. */ + @GridToStringInclude + private boolean started; + + /** {@code True} only if all write entries are locked by this transaction. */ + @GridToStringInclude + private AtomicBoolean commitAllowed = new AtomicBoolean(false); + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDistributedTxRemoteAdapter() { + // No-op. + } + + /** + * @param ctx Cache registry. + * @param nodeId Node ID. + * @param rmtThreadId Remote thread ID. + * @param xidVer XID version. + * @param commitVer Commit version. + * @param sys System flag. + * @param concurrency Concurrency level (should be pessimistic). + * @param isolation Transaction isolation. + * @param invalidate Invalidate flag. + * @param timeout Timeout. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + */ + public GridDistributedTxRemoteAdapter( + GridCacheSharedContext<K, V> ctx, + UUID nodeId, + long rmtThreadId, + GridCacheVersion xidVer, + GridCacheVersion commitVer, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + boolean invalidate, + long timeout, + int txSize, + @Nullable IgniteTxKey grpLockKey, + @Nullable UUID subjId, + int taskNameHash + ) { + super( + ctx, + nodeId, + xidVer, + ctx.versions().last(), + Thread.currentThread().getId(), + sys, + concurrency, + isolation, + timeout, + txSize, + grpLockKey, + subjId, + taskNameHash); + + this.rmtThreadId = rmtThreadId; + this.invalidate = invalidate; + + commitVersion(commitVer); + + // Must set started flag after concurrency and isolation. + started = true; + } + + /** {@inheritDoc} */ + @Override public UUID eventNodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> masterNodeIds() { + return Collections.singleton(nodeId); + } + + /** {@inheritDoc} */ + @Override public UUID originatingNodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> activeCacheIds() { + return Collections.emptyList(); + } + + /** + * @return Checks if transaction has no entries. + */ + @Override public boolean empty() { + return readMap.isEmpty() && writeMap.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean removed(IgniteTxKey<K> key) { + IgniteTxEntry e = writeMap.get(key); + + return e != null && e.op() == DELETE; + } + + /** {@inheritDoc} */ + @Override public void invalidate(boolean invalidate) { + this.invalidate = invalidate; + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() { + return writeMap; + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() { + return readMap; + } + + /** {@inheritDoc} */ + @Override public void seal() { + // No-op. + } + + /** + * Adds group lock key to remote transaction. + * + * @param key Key. + */ + public void groupLockKey(IgniteTxKey key) { + if (grpLockKey == null) + grpLockKey = key; + } + + /** {@inheritDoc} */ + @Override public GridTuple<V> peek(GridCacheContext<K, V> cacheCtx, boolean failFast, K key, + IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws GridCacheFilterFailedException { + assert false : "Method peek can only be called on user transaction: " + this; + + throw new IllegalStateException("Method peek can only be called on user transaction: " + this); + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) { + IgniteTxEntry<K, V> e = writeMap == null ? null : writeMap.get(key); + + if (e == null) + e = readMap == null ? null : readMap.get(key); + + return e; + } + + /** + * Clears entry from transaction as it never happened. + * + * @param key key to be removed. + */ + public void clearEntry(IgniteTxKey<K> key) { + readMap.remove(key); + writeMap.remove(key); + } + + /** + * @param baseVer Base version. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + */ + @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { + if (readMap != null && !readMap.isEmpty()) { + for (IgniteTxEntry<K, V> txEntry : readMap.values()) + doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); + } + + if (writeMap != null && !writeMap.isEmpty()) { + for (IgniteTxEntry<K, V> txEntry : writeMap.values()) + doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); + } + } + + /** + * Adds completed versions to an entry. + * + * @param txEntry Entry. + * @param baseVer Base version for completed versions. + * @param committedVers Completed versions relative to base version. + * @param rolledbackVers Rolled back versions relative to base version. + * @param pendingVers Pending versions. + */ + private void doneRemote(IgniteTxEntry<K, V> txEntry, GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers) { + while (true) { + GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); + + try { + // Handle explicit locks. + GridCacheVersion doneVer = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; + + entry.doneRemote(doneVer, baseVer, pendingVers, committedVers, rolledbackVers, isSystemInvalidate()); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + assert entry.obsoleteVersion() != null; + + if (log.isDebugEnabled()) + log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']'); + + // Replace the entry. + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { + try { + if (hasWriteKey(entry.txKey())) { + commitIfLocked(); + + return true; + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to commit remote transaction: " + this, e); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isStarted() { + return started; + } + + /** + * @return Remote node thread ID. + */ + @Override public long remoteThreadId() { + return rmtThreadId; + } + + /** + * @param e Transaction entry to set. + * @return {@code True} if value was set. + */ + @Override public boolean setWriteValue(IgniteTxEntry<K, V> e) { + checkInternal(e.txKey()); + + IgniteTxEntry<K, V> entry = writeMap.get(e.txKey()); + + if (entry == null) { + IgniteTxEntry<K, V> rmv = readMap.remove(e.txKey()); + + if (rmv != null) { + e.cached(rmv.cached(), rmv.keyBytes()); + + writeMap.put(e.txKey(), e); + } + // If lock is explicit. + else { + e.cached(e.context().cache().entryEx(e.key()), null); + + // explicit lock. + writeMap.put(e.txKey(), e); + } + } + else { + // Copy values. + entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); + entry.entryProcessors(e.entryProcessors()); + entry.valueBytes(e.valueBytes()); + entry.op(e.op()); + entry.ttl(e.ttl()); + entry.explicitVersion(e.explicitVersion()); + entry.groupLockEntry(e.groupLockEntry()); + + // DR stuff. + entry.drVersion(e.drVersion()); + entry.drExpireTime(e.drExpireTime()); + } + + addExplicit(e); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey<K> key) { + return writeMap.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { + assert false; + return null; + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey<K>> readSet() { + return readMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey<K>> writeSet() { + return writeMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry<K, V>> allEntries() { + return F.concat(false, writeEntries(), readEntries()); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry<K, V>> writeEntries() { + return writeMap.values(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry<K, V>> readEntries() { + return readMap.values(); + } + + /** + * Prepare phase. + * + * @throws IgniteCheckedException If prepare failed. + */ + @Override public void prepare() throws IgniteCheckedException { + // If another thread is doing prepare or rollback. + if (!state(PREPARING)) { + // In optimistic mode prepare may be called multiple times. + if(state() != PREPARING || !optimistic()) { + if (log.isDebugEnabled()) + log.debug("Invalid transaction state for prepare: " + this); + + return; + } + } + + try { + cctx.tm().prepareTx(this); + + if (pessimistic() || isSystemInvalidate()) + state(PREPARED); + } + catch (IgniteCheckedException e) { + setRollbackOnly(); + + throw e; + } + } + + /** + * @throws IgniteCheckedException If commit failed. + */ + @SuppressWarnings({"CatchGenericClass"}) + private void commitIfLocked() throws IgniteCheckedException { + if (state() == COMMITTING) { + for (IgniteTxEntry<K, V> txEntry : writeMap.values()) { + assert txEntry != null : "Missing transaction entry for tx: " + this; + + while (true) { + GridCacheEntryEx<K, V> cacheEntry = txEntry.cached(); + + assert cacheEntry != null : "Missing cached entry for transaction entry: " + txEntry; + + try { + GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; + + // If locks haven't been acquired yet, keep waiting. + if (!txEntry.groupLockEntry() && !cacheEntry.lockedBy(ver)) { + if (log.isDebugEnabled()) + log.debug("Transaction does not own lock for entry (will wait) [entry=" + cacheEntry + + ", tx=" + this + ']'); + + return; + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while committing (will retry): " + txEntry); + + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + + // Only one thread gets to commit. + if (commitAllowed.compareAndSet(false, true)) { + IgniteCheckedException err = null; + + if (!F.isEmpty(writeMap)) { + // Register this transaction as completed prior to write-phase to + // ensure proper lock ordering for removed entries. + cctx.tm().addCommittedTx(this); + + long topVer = topologyVersion(); + + // Node that for near transactions we grab all entries. + for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) { + GridCacheContext<K, V> cacheCtx = txEntry.context(); + + boolean replicate = cacheCtx.isDrEnabled(); + + try { + while (true) { + try { + GridCacheEntryEx<K, V> cached = txEntry.cached(); + + if (cached == null) + txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()), null); + + if (near() && cacheCtx.dr().receiveEnabled()) { + cached.markObsolete(xidVer); + + break; + } + + GridNearCacheEntry<K, V> nearCached = null; + + if (updateNearCache(cacheCtx, txEntry.key(), topVer)) + nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); + + if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) + txEntry.cached().unswap(true, false); + + GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry, + false); + + GridCacheOperation op = res.get1(); + V val = res.get2(); + byte[] valBytes = res.get3(); + + GridCacheVersion explicitVer = txEntry.drVersion(); + + if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) { + // Primary node has left the grid so we have to process conflicts on backups. + if (explicitVer == null) + explicitVer = writeVersion(); // Force write version to be used. + + GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, + txEntry, + explicitVer, + op, + val, + valBytes, + txEntry.ttl(), + txEntry.drExpireTime()); + + if (drRes != null) { + op = drRes.operation(); + val = drRes.value(); + valBytes = drRes.valueBytes(); + + if (drRes.isMerge()) + explicitVer = writeVersion(); + else if (op == NOOP) + txEntry.ttl(-1L); + } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; + } + + if (op == CREATE || op == UPDATE) { + // Invalidate only for near nodes (backups cannot be invalidated). + if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) + cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, + topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, CU.subjectId(this, cctx), + resolveTaskName()); + else { + cached.innerSet(this, eventNodeId(), nodeId, val, valBytes, false, false, + txEntry.ttl(), true, true, topVer, txEntry.filters(), + replicate ? DR_BACKUP : DR_NONE, txEntry.drExpireTime(), + near() ? null : explicitVer, CU.subjectId(this, cctx), + resolveTaskName()); + + // Keep near entry up to date. + if (nearCached != null) { + V val0 = null; + byte[] valBytes0 = null; + + GridCacheValueBytes valBytesTuple = cached.valueBytes(); + + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V)valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } + else + val0 = cached.rawGet(); + + nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), + cached.ttl(), nodeId); + } + } + } + else if (op == DELETE) { + cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, + topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); + + // Keep near entry up to date. + if (nearCached != null) + nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId); + } + else if (op == RELOAD) { + V reloaded = cached.innerReload(CU.<K, V>empty()); + + if (nearCached != null) { + nearCached.innerReload(CU.<K, V>empty()); + + nearCached.updateOrEvict(cached.version(), reloaded, null, + cached.expireTime(), cached.ttl(), nodeId); + } + } + else if (op == READ) { + assert near(); + + if (log.isDebugEnabled()) + log.debug("Ignoring READ entry when committing: " + txEntry); + } + // No-op. + else { + assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()): + "Transaction does not own lock for group lock entry during commit [tx=" + + this + ", txEntry=" + txEntry + ']'; + + if (txEntry.ttl() != -1L) + cached.updateTtl(null, txEntry.ttl()); + + if (nearCached != null) { + V val0 = null; + byte[] valBytes0 = null; + + GridCacheValueBytes valBytesTuple = cached.valueBytes(); + + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V)valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } + else + val0 = cached.rawGet(); + + nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), + cached.ttl(), nodeId); + } + } + + // Assert after setting values as we want to make sure + // that if we replaced removed entries. + assert + txEntry.op() == READ || onePhaseCommit() || + // If candidate is not there, then lock was explicit + // and we simply allow the commit to proceed. + !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : + "Transaction does not own lock for commit [entry=" + cached + + ", tx=" + this + ']'; + + // Break out of while loop. + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Attempting to commit a removed entry (will retry): " + txEntry); + + // Renew cached entry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + catch (Throwable ex) { + state(UNKNOWN); + + // In case of error, we still make the best effort to commit, + // as there is no way to rollback at this point. + err = ex instanceof IgniteCheckedException ? (IgniteCheckedException)ex : + new IgniteCheckedException("Commit produced a runtime exception: " + this, ex); + } + } + } + + if (err != null) { + state(UNKNOWN); + + throw err; + } + + cctx.tm().commitTx(this); + + state(COMMITTED); + } + } + } + + /** {@inheritDoc} */ + @Override public void commit() throws IgniteCheckedException { + if (optimistic()) + state(PREPARED); + + if (!state(COMMITTING)) { + IgniteTxState state = state(); + + // If other thread is doing commit, then no-op. + if (state == COMMITTING || state == COMMITTED) + return; + + if (log.isDebugEnabled()) + log.debug("Failed to set COMMITTING transaction state (will rollback): " + this); + + setRollbackOnly(); + + if (!isSystemInvalidate()) + throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); + + rollback(); + } + + commitIfLocked(); + } + + /** + * Forces commit for this tx. + * + * @throws IgniteCheckedException If commit failed. + */ + public void forceCommit() throws IgniteCheckedException { + commitIfLocked(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<IgniteTx> commitAsync() { + try { + commit(); + + return new GridFinishedFutureEx<IgniteTx>(this); + } + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CatchGenericClass"}) + @Override public void rollback() { + try { + // Note that we don't evict near entries here - + // they will be deleted by their corresponding transactions. + if (state(ROLLING_BACK)) { + cctx.tm().rollbackTx(this); + + state(ROLLED_BACK); + } + } + catch (RuntimeException | Error e) { + state(UNKNOWN); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<IgniteTx> rollbackAsync() { + rollback(); + + return new GridFinishedFutureEx<IgniteTx>(this); + } + + /** {@inheritDoc} */ + @Override public Collection<GridCacheVersion> alternateVersions() { + return explicitVers == null ? Collections.<GridCacheVersion>emptyList() : explicitVers; + } + + /** + * Adds explicit version if there is one. + * + * @param e Transaction entry. + */ + protected void addExplicit(IgniteTxEntry<K, V> e) { + if (e.explicitVersion() != null) { + if (explicitVers == null) + explicitVers = new LinkedList<>(); + + if (!explicitVers.contains(e.explicitVersion())) { + explicitVers.add(e.explicitVersion()); + + if (log.isDebugEnabled()) + log.debug("Added explicit version to transaction [explicitVer=" + e.explicitVersion() + + ", tx=" + this + ']'); + + // Register alternate version with TM. + cctx.tm().addAlternateVersion(e.explicitVersion(), this); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java new file mode 100644 index 0000000..9e475d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Lock request message. + */ +public class GridDistributedUnlockRequest<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Keys to unlock. */ + @GridDirectCollection(byte[].class) + private List<byte[]> keyBytes; + + /** Keys. */ + @GridDirectTransient + private List<K> keys; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDistributedUnlockRequest() { + /* No-op. */ + } + + /** + * @param cacheId Cache ID. + * @param keyCnt Key count. + */ + public GridDistributedUnlockRequest(int cacheId, int keyCnt) { + super(keyCnt); + + this.cacheId = cacheId; + } + + /** + * @return Key to lock. + */ + public List<byte[]> keyBytes() { + return keyBytes; + } + + /** + * @return Keys. + */ + public List<K> keys() { + return keys; + } + + /** + * @param key Key. + * @param bytes Key bytes. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void addKey(K key, byte[] bytes, GridCacheContext<K, V> ctx) throws IgniteCheckedException { + boolean depEnabled = ctx.deploymentEnabled(); + + if (depEnabled) + prepareObject(key, ctx.shared()); + + if (keys == null) + keys = new ArrayList<>(keysCount()); + + keys.add(key); + + if (keyBytes == null) + keyBytes = new ArrayList<>(keysCount()); + + keyBytes.add(bytes); + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (F.isEmpty(keyBytes) && !F.isEmpty(keys)) + keyBytes = marshalCollection(keys, ctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (keys == null && !F.isEmpty(keyBytes)) + keys = unmarshalCollection(keyBytes, ctx, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedUnlockRequest _clone = new GridDistributedUnlockRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedUnlockRequest _clone = (GridDistributedUnlockRequest)_msg; + + _clone.keyBytes = keyBytes; + _clone.keys = keys; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (keyBytes != null) { + if (commState.it == null) { + if (!commState.putInt(keyBytes.size())) + return false; + + commState.it = keyBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (keyBytes == null) + keyBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + keyBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 28; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDistributedUnlockRequest.class, this, "keyBytesSize", + keyBytes == null ? 0 : keyBytes.size(), "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java new file mode 100644 index 0000000..63eb41f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCachePeekMode.*; + +/** + * Partitioned cache entry public API. + */ +public class GridPartitionedCacheEntryImpl<K, V> extends GridCacheEntryImpl<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridPartitionedCacheEntryImpl() { + // No-op. + } + + /** + * @param nearPrj Parent projection or {@code null} if entry belongs to default cache. + * @param ctx Near cache context. + * @param key key. + * @param cached Cached entry (either from near or dht cache map). + */ + public GridPartitionedCacheEntryImpl(GridCacheProjectionImpl<K, V> nearPrj, GridCacheContext<K, V> ctx, K key, + @Nullable GridCacheEntryEx<K, V> cached) { + super(nearPrj, ctx, key, cached); + + assert !this.ctx.isDht() || ctx.isColocated(); + } + + /** + * @return Dht cache. + */ + public GridDhtCacheAdapter<K, V> dht() { + return ctx.isColocated() ? ctx.colocated() : ctx.isDhtAtomic() ? ctx.dht() : ctx.near().dht(); + } + + /** + * @return Near cache. + */ + public GridNearCacheAdapter<K, V> near() { + return ctx.near(); + } + + /** {@inheritDoc} */ + @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { + if (modes.contains(NEAR_ONLY) && ctx.isNear()) + return peekNear0(modes, CU.<K, V>empty()); + + V val = null; + + if (!modes.contains(PARTITIONED_ONLY)) + val = super.peek(modes); + + if (val == null) + val = peekDht0(modes, CU.<K, V>empty()); + + return val; + } + + /** + * @param filter Filter. + * @return Peeked value. + */ + @Nullable public V peekDht(@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + try { + return peekDht0(SMART, filter); + } + catch (IgniteCheckedException e) { + // Should never happen. + throw new IgniteException("Unable to perform entry peek() operation.", e); + } + } + + /** + * @param modes Peek modes. + * @param filter Optional entry filter. + * @return Peeked value. + * @throws IgniteCheckedException If failed. + */ + @Nullable private V peekNear0(@Nullable Collection<GridCachePeekMode> modes, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { + if (F.isEmpty(modes)) + return peekNear0(SMART, filter); + + assert modes != null; + + for (GridCachePeekMode mode : modes) { + V val = peekNear0(mode, filter); + + if (val != null) + return val; + } + + return null; + } + + /** + * @param mode Peek mode. + * @param filter Optional entry filter. + * @return Peeked value. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"unchecked"}) + @Nullable private V peekNear0(@Nullable GridCachePeekMode mode, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { + if (mode == null) + mode = SMART; + + while (true) { + GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection(); + + if (prjPerCall != null) + filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), filter)); + + GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall); + + try { + GridCacheEntryEx<K, V> entry = near().peekEx(key); + + return entry == null ? null : ctx.cloneOnFlag(entry.peek(mode, filter)); + } + catch (GridCacheEntryRemovedException ignore) { + // No-op. + } + finally { + ctx.gate().leave(prev); + } + } + } + + /** + * @param modes Peek modes. + * @param filter Optional entry filter. + * @return Peeked value. + * @throws IgniteCheckedException If failed. + */ + @Nullable private V peekDht0(@Nullable Collection<GridCachePeekMode> modes, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { + if (F.isEmpty(modes)) + return peekDht0(SMART, filter); + + assert modes != null; + + for (GridCachePeekMode mode : modes) { + V val = peekDht0(mode, filter); + + if (val != null) + return val; + } + + return null; + } + + /** + * @param mode Peek mode. + * @param filter Optional entry filter. + * @return Peeked value. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"unchecked"}) + @Nullable private V peekDht0(@Nullable GridCachePeekMode mode, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { + if (mode == null) + mode = SMART; + + while (true) { + GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection(); + + if (prjPerCall != null) + filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), filter)); + + GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall); + + try { + GridCacheEntryEx<K, V> entry = dht().peekEx(key); + + if (entry == null) + return null; + else { + GridTuple<V> peek = entry.peek0(false, mode, filter, ctx.tm().localTxx()); + + return peek != null ? ctx.cloneOnFlag(peek.get()) : null; + } + } + catch (GridCacheEntryRemovedException ignore) { + // No-op. + } + catch (GridCacheFilterFailedException e) { + e.printStackTrace(); + + assert false; + + return null; + } + finally { + ctx.gate().leave(prev); + } + } + } + + /** {@inheritDoc} */ + @Override protected GridCacheEntryEx<K, V> entryEx(boolean touch, long topVer) { + try { + return ctx.affinity().localNode(key, topVer) ? dht().entryEx(key, touch) : + ctx.isNear() ? near().entryEx(key, touch) : + new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 0, 0); + } + catch (GridDhtInvalidPartitionException ignore) { + return ctx.isNear() ? near().entryEx(key) : + new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 0, 0); + } + } + + /** {@inheritDoc} */ + @Override protected GridCacheEntryEx<K, V> peekEx(long topVer) { + try { + return ctx.affinity().localNode(key, topVer) ? dht().peekEx(key) : + ctx.isNear() ? near().peekEx(key) : null; + } + catch (GridDhtInvalidPartitionException ignore) { + return ctx.isNear() ? near().peekEx(key) : null; + } + } + + /** {@inheritDoc} */ + @Override public <V1> V1 addMeta(String name, V1 val) { + V1 v = null; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + v = de.addMeta(name, val); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : + near().entryExx(key, ctx.affinity().affinityTopologyVersion()); + + if (ne != null) { + V1 v1 = ne.addMeta(name, val); + + if (v == null) + v = v1; + } + } + + return v; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"RedundantCast"}) + @Override public <V1> V1 meta(String name) { + V1 v = null; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + v = (V1)de.meta(name); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = near().peekExx(key); + + if (ne != null) { + V1 v1 = (V1)ne.meta(name); + + if (v == null) + v = v1; + } + } + + return v; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"RedundantCast"}) + @Override public <V1> V1 putMetaIfAbsent(String name, Callable<V1> c) { + V1 v = null; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + v = (V1)de.putMetaIfAbsent(name, c); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : + near().entryExx(key, ctx.affinity().affinityTopologyVersion()); + + if (ne != null) { + V1 v1 = (V1)ne.putMetaIfAbsent(name, c); + + if (v == null) + v = v1; + } + } + + return v; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"RedundantCast"}) + @Override public <V1> V1 putMetaIfAbsent(String name, V1 val) { + V1 v = null; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + v = (V1)de.putMetaIfAbsent(name, val); + + GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : + near().entryExx(key, ctx.affinity().affinityTopologyVersion()); + + if (ne != null) { + V1 v1 = (V1)ne.putMetaIfAbsent(name, val); + + if (v == null) + v = v1; + } + + return v; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"RedundantCast"}) + @Override public <V1> V1 removeMeta(String name) { + V1 v = null; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + v = (V1)de.removeMeta(name); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = near().peekExx(key); + + if (ne != null) { + V1 v1 = (V1)ne.removeMeta(name); + + if (v == null) + v = v1; + } + } + + return v; + } + + /** {@inheritDoc} */ + @Override public <V1> boolean removeMeta(String name, V1 val) { + boolean b = false; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + b = de.removeMeta(name, val); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = near().peekExx(key); + + if (ne != null) + b |= ne.removeMeta(name, val); + } + + return b; + } + + /** {@inheritDoc} */ + @Override public <V1> boolean replaceMeta(String name, V1 curVal, V1 newVal) { + boolean b = false; + + GridDhtCacheEntry<K, V> de = dht().peekExx(key); + + if (de != null) + b = de.replaceMeta(name, curVal, newVal); + + if (ctx.isNear()) { + GridNearCacheEntry<K, V> ne = near().peekExx(key); + + if (ne != null) + b |= ne.replaceMeta(name, curVal, newVal); + } + + return b; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridPartitionedCacheEntryImpl.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java new file mode 100644 index 0000000..e9b748c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; +import java.io.*; +import java.util.concurrent.*; + +/** + * Externalizable wrapper for {@link ExpiryPolicy}. + */ +public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable, IgniteOptimizedMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + private static Object GG_CLASS_ID; + + /** */ + private ExpiryPolicy plc; + + /** */ + private static final byte CREATE_TTL_MASK = 0x01; + + /** */ + private static final byte UPDATE_TTL_MASK = 0x02; + + /** */ + private static final byte ACCESS_TTL_MASK = 0x04; + + /** */ + private Duration forCreate; + + /** */ + private Duration forUpdate; + + /** */ + private Duration forAccess; + + /** + * Required by {@link Externalizable}. + */ + public IgniteExternalizableExpiryPolicy() { + // No-op. + } + + /** + * @param plc Expiry policy. + */ + public IgniteExternalizableExpiryPolicy(ExpiryPolicy plc) { + assert plc != null; + + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return forCreate; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return forAccess; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return forUpdate; + } + + /** + * @param out Output stream. + * @param duration Duration. + * @throws IOException If failed. + */ + private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { + if (duration != null) { + if (duration.isEternal()) + out.writeLong(0); + else if (duration.getDurationAmount() == 0) + out.writeLong(1); + else + out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); + } + } + + /** + * @param in Input stream. + * @return Duration. + * @throws IOException If failed. + */ + private Duration readDuration(ObjectInput in) throws IOException { + long ttl = in.readLong(); + + assert ttl >= 0; + + if (ttl == 0) + return Duration.ETERNAL; + + return new Duration(TimeUnit.MILLISECONDS, ttl); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte flags = 0; + + Duration create = plc.getExpiryForCreation(); + + if (create != null) + flags |= CREATE_TTL_MASK; + + Duration update = plc.getExpiryForUpdate(); + + if (update != null) + flags |= UPDATE_TTL_MASK; + + Duration access = plc.getExpiryForAccess(); + + if (access != null) + flags |= ACCESS_TTL_MASK; + + out.writeByte(flags); + + writeDuration(out, create); + + writeDuration(out, update); + + writeDuration(out, access); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + byte flags = in.readByte(); + + if ((flags & CREATE_TTL_MASK) != 0) + forCreate = readDuration(in); + + if ((flags & UPDATE_TTL_MASK) != 0) + forUpdate = readDuration(in); + + if ((flags & ACCESS_TTL_MASK) != 0) + forAccess = readDuration(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteExternalizableExpiryPolicy.class, this); + } +}