http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java new file mode 100644 index 0000000..30c2330 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -0,0 +1,833 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * Lock request message. + */ +public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Sender node ID. */ + private UUID nodeId; + + /** Near transaction version. */ + private GridCacheVersion nearXidVer; + + /** Thread ID. */ + private long threadId; + + /** Future ID. */ + private IgniteUuid futId; + + /** Max wait timeout. */ + private long timeout; + + /** Indicates whether lock is obtained within a scope of transaction. */ + private boolean isInTx; + + /** Invalidate flag for transactions. */ + private boolean isInvalidate; + + /** Indicates whether implicit lock so for read or write operation. */ + private boolean isRead; + + /** Transaction isolation. */ + private IgniteTxIsolation isolation; + + /** Key bytes for keys to lock. */ + @GridDirectCollection(byte[].class) + private List<byte[]> keyBytes; + + /** Keys. */ + @GridDirectTransient + private List<K> keys; + + /** Write entries. */ + @GridToStringInclude + @GridDirectTransient + private List<IgniteTxEntry<K, V>> writeEntries; + + /** Serialized write entries. */ + private byte[] writeEntriesBytes; + + /** Array indicating whether value should be returned for a key. */ + @GridToStringInclude + private boolean[] retVals; + + /** Key-bytes index. */ + @GridDirectTransient + protected int idx; + + /** Key count. */ + private int txSize; + + /** Group lock key if this is a group-lock transaction. */ + @GridDirectTransient + private IgniteTxKey grpLockKey; + + /** Group lock key bytes. */ + private byte[] grpLockKeyBytes; + + /** Partition lock flag. Only if group-lock transaction. */ + private boolean partLock; + + /** DR versions. */ + @GridToStringInclude + private GridCacheVersion[] drVersByIdx; + + /** + * Empty constructor. + */ + public GridDistributedLockRequest() { + /* No-op. */ + } + + /** + * @param nodeId Node ID. + * @param nearXidVer Near transaction ID. + * @param threadId Thread ID. + * @param futId Future ID. + * @param lockVer Cache version. + * @param isInTx {@code True} if implicit transaction lock. + * @param isRead Indicates whether implicit lock is for read or write operation. + * @param isolation Transaction isolation. + * @param isInvalidate Invalidation flag. + * @param timeout Lock timeout. + * @param keyCnt Number of keys. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param partLock {@code True} if this is a group-lock transaction request and whole partition is + * locked. + */ + public GridDistributedLockRequest( + int cacheId, + UUID nodeId, + @Nullable GridCacheVersion nearXidVer, + long threadId, + IgniteUuid futId, + GridCacheVersion lockVer, + boolean isInTx, + boolean isRead, + IgniteTxIsolation isolation, + boolean isInvalidate, + long timeout, + int keyCnt, + int txSize, + @Nullable IgniteTxKey grpLockKey, + boolean partLock + ) { + super(lockVer, keyCnt); + + assert keyCnt > 0; + assert futId != null; + assert !isInTx || isolation != null; + + this.cacheId = cacheId; + this.nodeId = nodeId; + this.nearXidVer = nearXidVer; + this.threadId = threadId; + this.futId = futId; + this.isInTx = isInTx; + this.isRead = isRead; + this.isolation = isolation; + this.isInvalidate = isInvalidate; + this.timeout = timeout; + this.txSize = txSize; + this.grpLockKey = grpLockKey; + this.partLock = partLock; + + retVals = new boolean[keyCnt]; + } + + /** + * + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Near transaction ID. + */ + public GridCacheVersion nearXidVersion() { + return nearXidVer; + } + + /** + * + * @return Owner node thread ID. + */ + public long threadId() { + return threadId; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return {@code True} if implicit transaction lock. + */ + public boolean inTx() { + return isInTx; + } + + /** + * @return Invalidate flag. + */ + public boolean isInvalidate() { + return isInvalidate; + } + + /** + * @return {@code True} if lock is implicit and for a read operation. + */ + public boolean txRead() { + return isRead; + } + + /** + * @param idx Key index. + * @return Flag indicating whether a value should be returned. + */ + public boolean returnValue(int idx) { + return retVals[idx]; + } + + /** + * @return Return flags. + */ + public boolean[] returnFlags() { + return retVals; + } + + /** + * @return Transaction isolation or <tt>null</tt> if not in transaction. + */ + public IgniteTxIsolation isolation() { + return isolation; + } + + /** + * + * @return Key to lock. + */ + public List<byte[]> keyBytes() { + return keyBytes; + } + + /** + * @return Write entries list. + */ + public List<IgniteTxEntry<K, V>> writeEntries() { + return writeEntries; + } + + /** + * @return Tx size. + */ + public int txSize() { + return txSize; + } + + /** + * Adds a key. + * + * @param key Key. + * @param retVal Flag indicating whether value should be returned. + * @param keyBytes Key bytes. + * @param writeEntry Write entry. + * @param cands Candidates. + * @param drVer DR version. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void addKeyBytes( + K key, + @Nullable byte[] keyBytes, + @Nullable IgniteTxEntry<K, V> writeEntry, + boolean retVal, + @Nullable Collection<GridCacheMvccCandidate<K>> cands, + @Nullable GridCacheVersion drVer, + GridCacheContext<K, V> ctx + ) throws IgniteCheckedException { + if (ctx.deploymentEnabled()) + prepareObject(key, ctx.shared()); + + if (keyBytes != null) { + if (this.keyBytes == null) + this.keyBytes = new ArrayList<>(keysCount()); + + this.keyBytes.add(keyBytes); + } + + if (keys == null) + keys = new ArrayList<>(keysCount()); + + keys.add(key); + + candidatesByIndex(idx, cands); + drVersionByIndex(idx, drVer); + + retVals[idx] = retVal; + + if (writeEntry != null) { + if (writeEntries == null) { + assert idx == 0 : "Cannot start adding write entries in the middle of lock message [idx=" + idx + + ", writeEntry=" + writeEntry + ']'; + + writeEntries = new ArrayList<>(keysCount()); + } + + writeEntries.add(writeEntry); + } + + idx++; + } + + /** + * @return Unmarshalled keys. + */ + public List<K> keys() { + return keys; + } + + /** + * @return {@code True} if lock request for group-lock transaction. + */ + public boolean groupLock() { + return grpLockKey != null; + } + + /** + * @return Group lock key. + */ + @Nullable public IgniteTxKey groupLockKey() { + return grpLockKey; + } + + /** + * @return {@code True} if partition is locked in group-lock transaction. + */ + public boolean partitionLock() { + return partLock; + } + + /** + * @return Max lock wait time. + */ + public long timeout() { + return timeout; + } + + /** + * @param idx Key index. + * @param drVer DR version. + */ + @SuppressWarnings({"unchecked"}) + public void drVersionByIndex(int idx, GridCacheVersion drVer) { + assert idx < keysCount(); + + // If nothing to add. + if (drVer == null) + return; + + if (drVersByIdx == null) + drVersByIdx = new GridCacheVersion[keysCount()]; + + drVersByIdx[idx] = drVer; + } + + /** + * @param idx Key index. + * @return DR versions for given key. + */ + public GridCacheVersion drVersionByIndex(int idx) { + return drVersByIdx == null ? null : drVersByIdx[idx]; + } + + /** + * @return All DR versions. + */ + public GridCacheVersion[] drVersions() { + return drVersByIdx; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (grpLockKey != null && grpLockKeyBytes == null) { + if (ctx.deploymentEnabled()) + prepareObject(grpLockKey, ctx); + + grpLockKeyBytes = CU.marshal(ctx, grpLockKey); + } + + if (writeEntries != null) { + marshalTx(writeEntries, ctx); + + writeEntriesBytes = ctx.marshaller().marshal(writeEntries); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (keys == null) + keys = unmarshalCollection(keyBytes, ctx, ldr); + + if (grpLockKey == null && grpLockKeyBytes != null) + grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); + + if (writeEntriesBytes != null) { + writeEntries = ctx.marshaller().unmarshal(writeEntriesBytes, ldr); + + unmarshalTx(writeEntries, false, ctx, ldr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneCallsConstructors", "OverriddenMethodCallDuringObjectConstruction", + "CloneDoesntCallSuperClone"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedLockRequest _clone = new GridDistributedLockRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedLockRequest _clone = (GridDistributedLockRequest)_msg; + + _clone.nodeId = nodeId; + _clone.nearXidVer = nearXidVer; + _clone.threadId = threadId; + _clone.futId = futId; + _clone.timeout = timeout; + _clone.isInTx = isInTx; + _clone.isInvalidate = isInvalidate; + _clone.isRead = isRead; + _clone.isolation = isolation; + _clone.keyBytes = keyBytes; + _clone.keys = keys; + _clone.writeEntries = writeEntries; + _clone.writeEntriesBytes = writeEntriesBytes; + _clone.retVals = retVals; + _clone.idx = idx; + _clone.txSize = txSize; + _clone.grpLockKey = grpLockKey; + _clone.grpLockKeyBytes = grpLockKeyBytes; + _clone.partLock = partLock; + _clone.drVersByIdx = drVersByIdx; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (drVersByIdx != null) { + if (commState.it == null) { + if (!commState.putInt(drVersByIdx.length)) + return false; + + commState.it = arrayIterator(drVersByIdx); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 9: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 10: + if (!commState.putByteArray(grpLockKeyBytes)) + return false; + + commState.idx++; + + case 11: + if (!commState.putBoolean(isInTx)) + return false; + + commState.idx++; + + case 12: + if (!commState.putBoolean(isInvalidate)) + return false; + + commState.idx++; + + case 13: + if (!commState.putBoolean(isRead)) + return false; + + commState.idx++; + + case 14: + if (!commState.putEnum(isolation)) + return false; + + commState.idx++; + + case 15: + if (keyBytes != null) { + if (commState.it == null) { + if (!commState.putInt(keyBytes.size())) + return false; + + commState.it = keyBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 16: + if (!commState.putCacheVersion(nearXidVer)) + return false; + + commState.idx++; + + case 17: + if (!commState.putUuid(nodeId)) + return false; + + commState.idx++; + + case 18: + if (!commState.putBoolean(partLock)) + return false; + + commState.idx++; + + case 19: + if (!commState.putBooleanArray(retVals)) + return false; + + commState.idx++; + + case 20: + if (!commState.putLong(threadId)) + return false; + + commState.idx++; + + case 21: + if (!commState.putLong(timeout)) + return false; + + commState.idx++; + + case 22: + if (!commState.putInt(txSize)) + return false; + + commState.idx++; + + case 23: + if (!commState.putByteArray(writeEntriesBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (drVersByIdx == null) + drVersByIdx = new GridCacheVersion[commState.readSize]; + + for (int i = commState.readItems; i < commState.readSize; i++) { + GridCacheVersion _val = commState.getCacheVersion(); + + if (_val == CACHE_VER_NOT_READ) + return false; + + drVersByIdx[i] = (GridCacheVersion)_val; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 9: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 10: + byte[] grpLockKeyBytes0 = commState.getByteArray(); + + if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ) + return false; + + grpLockKeyBytes = grpLockKeyBytes0; + + commState.idx++; + + case 11: + if (buf.remaining() < 1) + return false; + + isInTx = commState.getBoolean(); + + commState.idx++; + + case 12: + if (buf.remaining() < 1) + return false; + + isInvalidate = commState.getBoolean(); + + commState.idx++; + + case 13: + if (buf.remaining() < 1) + return false; + + isRead = commState.getBoolean(); + + commState.idx++; + + case 14: + if (buf.remaining() < 1) + return false; + + byte isolation0 = commState.getByte(); + + isolation = IgniteTxIsolation.fromOrdinal(isolation0); + + commState.idx++; + + case 15: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (keyBytes == null) + keyBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + keyBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 16: + GridCacheVersion nearXidVer0 = commState.getCacheVersion(); + + if (nearXidVer0 == CACHE_VER_NOT_READ) + return false; + + nearXidVer = nearXidVer0; + + commState.idx++; + + case 17: + UUID nodeId0 = commState.getUuid(); + + if (nodeId0 == UUID_NOT_READ) + return false; + + nodeId = nodeId0; + + commState.idx++; + + case 18: + if (buf.remaining() < 1) + return false; + + partLock = commState.getBoolean(); + + commState.idx++; + + case 19: + boolean[] retVals0 = commState.getBooleanArray(); + + if (retVals0 == BOOLEAN_ARR_NOT_READ) + return false; + + retVals = retVals0; + + commState.idx++; + + case 20: + if (buf.remaining() < 8) + return false; + + threadId = commState.getLong(); + + commState.idx++; + + case 21: + if (buf.remaining() < 8) + return false; + + timeout = commState.getLong(); + + commState.idx++; + + case 22: + if (buf.remaining() < 4) + return false; + + txSize = commState.getInt(); + + commState.idx++; + + case 23: + byte[] writeEntriesBytes0 = commState.getByteArray(); + + if (writeEntriesBytes0 == BYTE_ARR_NOT_READ) + return false; + + writeEntriesBytes = writeEntriesBytes0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 22; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDistributedLockRequest.class, this, "keysCnt", retVals.length, + "super", super.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java new file mode 100644 index 0000000..99d5997 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Lock response message. + */ +public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Error. */ + @GridDirectTransient + private Throwable err; + + /** Serialized error. */ + private byte[] errBytes; + + /** Value bytes. */ + @GridDirectCollection(GridCacheValueBytes.class) + private List<GridCacheValueBytes> valBytes; + + /** Values. */ + @GridToStringInclude + @GridDirectTransient + private List<V> vals; + + /** + * Empty constructor (required by {@link Externalizable}). + */ + public GridDistributedLockResponse() { + /* No-op. */ + } + + /** + * @param cacheId Cache ID. + * @param lockVer Lock version. + * @param futId Future ID. + * @param cnt Key count. + */ + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + int cnt) { + super(lockVer, cnt); + + assert futId != null; + + this.cacheId = cacheId; + this.futId = futId; + + vals = new ArrayList<>(cnt); + valBytes = new ArrayList<>(cnt); + } + + /** + * @param cacheId Cache ID. + * @param lockVer Lock ID. + * @param futId Future ID. + * @param err Error. + */ + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + Throwable err) { + super(lockVer, 0); + + assert futId != null; + + this.cacheId = cacheId; + this.futId = futId; + this.err = err; + } + + /** + * @param cacheId Cache ID. + * @param lockVer Lock ID. + * @param futId Future ID. + * @param cnt Count. + * @param err Error. + */ + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + int cnt, + Throwable err) { + super(lockVer, cnt); + + assert futId != null; + + this.cacheId = cacheId; + this.futId = futId; + this.err = err; + + vals = new ArrayList<>(cnt); + valBytes = new ArrayList<>(cnt); + } + + /** + * + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Error. + */ + public Throwable error() { + return err; + } + + /** + * @param err Error to set. + */ + public void error(Throwable err) { + this.err = err; + } + + /** + * @param idx Index of locked flag. + * @return Value of locked flag at given index. + */ + public boolean isCurrentlyLocked(int idx) { + assert idx >= 0; + + Collection<GridCacheMvccCandidate<K>> cands = candidatesByIndex(idx); + + for (GridCacheMvccCandidate<K> cand : cands) + if (cand.owner()) + return true; + + return false; + } + + /** + * @param idx Candidates index. + * @param cands Collection of candidates. + * @param committedVers Committed versions relative to lock version. + * @param rolledbackVers Rolled back versions relative to lock version. + */ + public void setCandidates(int idx, Collection<GridCacheMvccCandidate<K>> cands, + Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { + assert idx >= 0; + + completedVersions(committedVers, rolledbackVers); + + candidatesByIndex(idx, cands); + } + + /** + * @param idx Value index. + * + * @return Value bytes (possibly {@code null}). + */ + @Nullable public byte[] valueBytes(int idx) { + if (!F.isEmpty(valBytes)) { + GridCacheValueBytes res = valBytes.get(idx); + + if (res != null && !res.isPlain()) + return res.get(); + } + + return null; + } + + /** + * @param val Value. + * @param valBytes Value bytes (possibly {@code null}). + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void addValueBytes(V val, @Nullable byte[] valBytes, GridCacheContext<K, V> ctx) throws IgniteCheckedException { + if (ctx.deploymentEnabled()) + prepareObject(val, ctx.shared()); + + GridCacheValueBytes vb = null; + + if (val != null) { + vb = val instanceof byte[] ? GridCacheValueBytes.plain(val) : valBytes != null ? + GridCacheValueBytes.marshaled(valBytes) : null; + } + else if (valBytes != null) + vb = GridCacheValueBytes.marshaled(valBytes); + + this.valBytes.add(vb); + + vals.add(val); + } + + /** + * @return Values size. + */ + protected int valuesSize() { + return vals.size(); + } + + /** + * @param idx Index. + * @return Value for given index. + */ + @Nullable public V value(int idx) { + if (!F.isEmpty(vals)) { + V res = vals.get(idx); + + if (res != null) + return res; + } + + // If there was no value in values collection, then it could be in value bytes collection in case of byte[]. + if (!F.isEmpty(valBytes)) { + GridCacheValueBytes res = valBytes.get(idx); + + if (res != null && res.isPlain()) + return (V)res.get(); + } + + // Value is not found in both value and value bytes collections. + return null; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (F.isEmpty(valBytes) && !F.isEmpty(vals)) + valBytes = marshalValuesCollection(vals, ctx); + + if (err != null) + errBytes = ctx.marshaller().marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (F.isEmpty(vals) && !F.isEmpty(valBytes)) + vals = unmarshalValueBytesCollection(valBytes, ctx, ldr); + + if (errBytes != null) + err = ctx.marshaller().unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedLockResponse _clone = new GridDistributedLockResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedLockResponse _clone = (GridDistributedLockResponse)_msg; + + _clone.futId = futId; + _clone.err = err; + _clone.errBytes = errBytes; + _clone.valBytes = valBytes; + _clone.vals = vals; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (!commState.putByteArray(errBytes)) + return false; + + commState.idx++; + + case 9: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 10: + if (valBytes != null) { + if (commState.it == null) { + if (!commState.putInt(valBytes.size())) + return false; + + commState.it = valBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putValueBytes((GridCacheValueBytes)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + byte[] errBytes0 = commState.getByteArray(); + + if (errBytes0 == BYTE_ARR_NOT_READ) + return false; + + errBytes = errBytes0; + + commState.idx++; + + case 9: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 10: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (valBytes == null) + valBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + GridCacheValueBytes _val = commState.getValueBytes(); + + if (_val == VAL_BYTES_NOT_READ) + return false; + + valBytes.add((GridCacheValueBytes)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 23; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDistributedLockResponse.class, this, + "valBytesLen", valBytes == null ? 0 : valBytes.size(), + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java new file mode 100644 index 0000000..d34b2b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -0,0 +1,695 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Transaction completion message. + */ +public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Thread ID. */ + private long threadId; + + /** Commit version. */ + private GridCacheVersion commitVer; + + /** Invalidate flag. */ + private boolean invalidate; + + /** Commit flag. */ + private boolean commit; + + /** Sync commit flag. */ + private boolean syncCommit; + + /** Sync commit flag. */ + private boolean syncRollback; + + /** Min version used as base for completed versions. */ + private GridCacheVersion baseVer; + + /** Transaction write entries. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxEntry<K, V>> writeEntries; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> writeEntriesBytes; + + /** Write entries which have not been transferred to nodes during lock request. */ + @GridToStringInclude + @GridDirectTransient + private Collection<IgniteTxEntry<K, V>> recoveryWrites; + + /** */ + @GridDirectCollection(byte[].class) + private Collection<byte[]> recoveryWritesBytes; + + /** Expected txSize. */ + private int txSize; + + /** Group lock key. */ + @GridDirectTransient + private IgniteTxKey grpLockKey; + + /** Group lock key bytes. */ + private byte[] grpLockKeyBytes; + + /** System flag. */ + private boolean sys; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDistributedTxFinishRequest() { + /* No-op. */ + } + + /** + * @param xidVer Transaction ID. + * @param futId future ID. + * @param threadId Thread ID. + * @param commitVer Commit version. + * @param commit Commit flag. + * @param invalidate Invalidate flag. + * @param sys System flag. + * @param baseVer Base version. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + * @param txSize Expected transaction size. + * @param writeEntries Write entries. + * @param recoveryWrites Recover entries. In pessimistic mode entries which were not transferred to remote nodes + * with lock requests. {@code Null} for optimistic mode. + * @param grpLockKey Group lock key if this is a group-lock transaction. + */ + public GridDistributedTxFinishRequest( + GridCacheVersion xidVer, + IgniteUuid futId, + @Nullable GridCacheVersion commitVer, + long threadId, + boolean commit, + boolean invalidate, + boolean sys, + boolean syncCommit, + boolean syncRollback, + GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + int txSize, + Collection<IgniteTxEntry<K, V>> writeEntries, + Collection<IgniteTxEntry<K, V>> recoveryWrites, + @Nullable IgniteTxKey grpLockKey + ) { + super(xidVer, writeEntries == null ? 0 : writeEntries.size()); + assert xidVer != null; + + this.futId = futId; + this.commitVer = commitVer; + this.threadId = threadId; + this.commit = commit; + this.invalidate = invalidate; + this.sys = sys; + this.syncCommit = syncCommit; + this.syncRollback = syncRollback; + this.baseVer = baseVer; + this.txSize = txSize; + this.writeEntries = writeEntries; + this.recoveryWrites = recoveryWrites; + this.grpLockKey = grpLockKey; + + completedVersions(committedVers, rolledbackVers); + } + + /** + * Clones write entries so that near entries are not passed to DHT cache. + */ + public void cloneEntries() { + if (F.isEmpty(writeEntries)) + return; + + Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(writeEntries.size()); + + for (IgniteTxEntry<K, V> e : writeEntries) { + GridCacheContext<K, V> cacheCtx = e.context(); + + // Clone only if it is a near cache. + if (cacheCtx.isNear()) + cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context())); + else + cp.add(e); + } + + writeEntries = cp; + } + + /** + * @return System flag. + */ + public boolean system() { + return sys; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Thread ID. + */ + public long threadId() { + return threadId; + } + + /** + * @return Commit version. + */ + public GridCacheVersion commitVersion() { + return commitVer; + } + + /** + * @return Commit flag. + */ + public boolean commit() { + return commit; + } + + /** + * + * @return Invalidate flag. + */ + public boolean isInvalidate() { + return invalidate; + } + + /** + * @return Sync commit flag. + */ + public boolean syncCommit() { + return syncCommit; + } + + /** + * @return Sync rollback flag. + */ + public boolean syncRollback() { + return syncRollback; + } + + /** + * @return Base version. + */ + public GridCacheVersion baseVersion() { + return baseVer; + } + + /** + * @return Write entries. + */ + public Collection<IgniteTxEntry<K, V>> writes() { + return writeEntries; + } + + /** + * @return Recover entries. + */ + public Collection<IgniteTxEntry<K, V>> recoveryWrites() { + return recoveryWrites; + } + + /** + * @return Expected tx size. + */ + public int txSize() { + return txSize; + } + + /** + * + * @return {@code True} if reply is required. + */ + public boolean replyRequired() { + return commit ? syncCommit : syncRollback; + } + + /** + * @return {@code True} if group lock transaction. + */ + public boolean groupLock() { + return grpLockKey != null; + } + + /** + * @return Group lock key. + */ + @Nullable public IgniteTxKey groupLockKey() { + return grpLockKey; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (writeEntries != null) { + marshalTx(writeEntries, ctx); + + writeEntriesBytes = new ArrayList<>(writeEntries.size()); + + for (IgniteTxEntry<K, V> e : writeEntries) + writeEntriesBytes.add(ctx.marshaller().marshal(e)); + } + + if (recoveryWrites != null) { + marshalTx(recoveryWrites, ctx); + + recoveryWritesBytes = new ArrayList<>(recoveryWrites.size()); + + for (IgniteTxEntry<K, V> e : recoveryWrites) + recoveryWritesBytes.add(ctx.marshaller().marshal(e)); + } + + if (grpLockKey != null && grpLockKeyBytes == null) { + if (ctx.deploymentEnabled()) + prepareObject(grpLockKey, ctx); + + grpLockKeyBytes = CU.marshal(ctx, grpLockKey); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (writeEntriesBytes != null) { + writeEntries = new ArrayList<>(writeEntriesBytes.size()); + + for (byte[] arr : writeEntriesBytes) + writeEntries.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); + + unmarshalTx(writeEntries, false, ctx, ldr); + } + + if (recoveryWritesBytes != null) { + recoveryWrites = new ArrayList<>(recoveryWritesBytes.size()); + + for (byte[] arr : recoveryWritesBytes) + recoveryWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr)); + + unmarshalTx(recoveryWrites, false, ctx, ldr); + } + + if (grpLockKeyBytes != null && grpLockKey == null) + grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedTxFinishRequest _clone = new GridDistributedTxFinishRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedTxFinishRequest _clone = (GridDistributedTxFinishRequest)_msg; + + _clone.futId = futId; + _clone.threadId = threadId; + _clone.commitVer = commitVer; + _clone.invalidate = invalidate; + _clone.commit = commit; + _clone.baseVer = baseVer; + _clone.writeEntries = writeEntries; + _clone.writeEntriesBytes = writeEntriesBytes; + _clone.recoveryWrites = recoveryWrites; + _clone.recoveryWritesBytes = recoveryWritesBytes; + _clone.txSize = txSize; + _clone.grpLockKey = grpLockKey; + _clone.grpLockKeyBytes = grpLockKeyBytes; + _clone.sys = sys; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 8: + if (!commState.putCacheVersion(baseVer)) + return false; + + commState.idx++; + + case 9: + if (!commState.putBoolean(commit)) + return false; + + commState.idx++; + + case 10: + if (!commState.putCacheVersion(commitVer)) + return false; + + commState.idx++; + + case 11: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 12: + if (!commState.putByteArray(grpLockKeyBytes)) + return false; + + commState.idx++; + + case 13: + if (!commState.putBoolean(invalidate)) + return false; + + commState.idx++; + + case 14: + if (!commState.putBoolean(syncCommit)) + return false; + + commState.idx++; + + case 15: + if (!commState.putBoolean(syncRollback)) + return false; + + commState.idx++; + + case 16: + if (recoveryWritesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(recoveryWritesBytes.size())) + return false; + + commState.it = recoveryWritesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 17: + if (!commState.putLong(threadId)) + return false; + + commState.idx++; + + case 18: + if (!commState.putInt(txSize)) + return false; + + commState.idx++; + + case 19: + if (writeEntriesBytes != null) { + if (commState.it == null) { + if (!commState.putInt(writeEntriesBytes.size())) + return false; + + commState.it = writeEntriesBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 20: + if (!commState.putBoolean(sys)) + return false; + + commState.idx++; + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 8: + GridCacheVersion baseVer0 = commState.getCacheVersion(); + + if (baseVer0 == CACHE_VER_NOT_READ) + return false; + + baseVer = baseVer0; + + commState.idx++; + + case 9: + if (buf.remaining() < 1) + return false; + + commit = commState.getBoolean(); + + commState.idx++; + + case 10: + GridCacheVersion commitVer0 = commState.getCacheVersion(); + + if (commitVer0 == CACHE_VER_NOT_READ) + return false; + + commitVer = commitVer0; + + commState.idx++; + + case 11: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 12: + byte[] grpLockKeyBytes0 = commState.getByteArray(); + + if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ) + return false; + + grpLockKeyBytes = grpLockKeyBytes0; + + commState.idx++; + + case 13: + if (buf.remaining() < 1) + return false; + + invalidate = commState.getBoolean(); + + commState.idx++; + + case 14: + if (buf.remaining() < 1) + return false; + + syncCommit = commState.getBoolean(); + + commState.idx++; + + case 15: + if (buf.remaining() < 1) + return false; + + syncRollback = commState.getBoolean(); + + commState.idx++; + + case 16: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (recoveryWritesBytes == null) + recoveryWritesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + recoveryWritesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 17: + if (buf.remaining() < 8) + return false; + + threadId = commState.getLong(); + + commState.idx++; + + case 18: + if (buf.remaining() < 4) + return false; + + txSize = commState.getInt(); + + commState.idx++; + + case 19: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (writeEntriesBytes == null) + writeEntriesBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + writeEntriesBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 20: + if (buf.remaining() < 1) + return false; + + sys = commState.getBoolean(); + + commState.idx++; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 24; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this, + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java new file mode 100644 index 0000000..b6ce1c1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.io.*; +import java.nio.*; + +/** + * Transaction finish response. + */ +public class GridDistributedTxFinishResponse<K, V> extends GridCacheMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private GridCacheVersion txId; + + /** Future ID. */ + private IgniteUuid futId; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDistributedTxFinishResponse() { + /* No-op. */ + } + + /** + * @param txId Transaction id. + * @param futId Future ID. + */ + public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) { + assert txId != null; + assert futId != null; + + this.txId = txId; + this.futId = futId; + } + + /** + * + * @return Transaction id. + */ + public GridCacheVersion xid() { + return txId; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors", + "OverriddenMethodCallDuringObjectConstruction"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDistributedTxFinishResponse _clone = new GridDistributedTxFinishResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDistributedTxFinishResponse _clone = (GridDistributedTxFinishResponse)_msg; + + _clone.txId = txId; + _clone.futId = futId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 4: + if (!commState.putCacheVersion(txId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 4: + GridCacheVersion txId0 = commState.getCacheVersion(); + + if (txId0 == CACHE_VER_NOT_READ) + return false; + + txId = txId0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 25; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridDistributedTxFinishResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java new file mode 100644 index 0000000..0fc45c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Transaction node mapping. + */ +public class GridDistributedTxMapping<K, V> implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Mapped node. */ + @GridToStringExclude + private ClusterNode node; + + /** Entries. */ + @GridToStringInclude + private Collection<IgniteTxEntry<K, V>> entries; + + /** Explicit lock flag. */ + private boolean explicitLock; + + /** DHT version. */ + private GridCacheVersion dhtVer; + + /** Copy on remove flag. */ + private boolean readOnly; + + /** {@code True} if this is last mapping for node. */ + private boolean last; + + /** IDs of backup nodes receiving last prepare request during this mapping. */ + private Collection<UUID> lastBackups; + + /** {@code True} if mapping is for near caches, {@code false} otherwise. */ + private boolean near; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDistributedTxMapping() { + // No-op. + } + + /** + * @param node Mapped node. + */ + public GridDistributedTxMapping(ClusterNode node) { + this.node = node; + + entries = new ConcurrentLinkedQueue<>(); + } + + /** + * @return IDs of backup nodes receiving last prepare request during this mapping. + */ + @Nullable public Collection<UUID> lastBackups() { + return lastBackups; + } + + /** + * @param lastBackups IDs of backup nodes receiving last prepare request during this mapping. + */ + public void lastBackups(@Nullable Collection<UUID> lastBackups) { + this.lastBackups = lastBackups; + } + + /** + * @return {@code True} if this is last mapping for node. + */ + public boolean last() { + return last; + } + + /** + * @param last If {@code True} this is last mapping for node. + */ + public void last(boolean last) { + this.last = last; + } + + /** + * @return {@code True} if mapping is for near caches, {@code false} otherwise. + */ + public boolean near() { + return near; + } + + /** + * @param near {@code True} if mapping is for near caches, {@code false} otherwise. + */ + public void near(boolean near) { + this.near = near; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** + * @return Entries. + */ + public Collection<IgniteTxEntry<K, V>> entries() { + return entries; + } + + /** + * @param entries Mapped entries. + * @param readOnly Flag indicating that passed in collection is read-only. + */ + public void entries(Collection<IgniteTxEntry<K, V>> entries, boolean readOnly) { + this.entries = entries; + + // Set copy on remove flag as passed in collection is unmodifiable. + this.readOnly = true; + } + + /** + * @return {@code True} if lock is explicit. + */ + public boolean explicitLock() { + return explicitLock; + } + + /** + * Sets explicit flag to {@code true}. + */ + public void markExplicitLock() { + explicitLock = true; + } + + /** + * @return DHT version. + */ + public GridCacheVersion dhtVersion() { + return dhtVer; + } + + /** + * @param dhtVer DHT version. + */ + public void dhtVersion(GridCacheVersion dhtVer) { + this.dhtVer = dhtVer; + + for (IgniteTxEntry<K, V> e : entries) + e.dhtVersion(dhtVer); + } + + /** + * @return Reads. + */ + public Collection<IgniteTxEntry<K, V>> reads() { + return F.view(entries, CU.<K, V>reads()); + } + + /** + * @return Writes. + */ + public Collection<IgniteTxEntry<K, V>> writes() { + return F.view(entries, CU.<K, V>writes()); + } + + /** + * @param entry Adds entry. + */ + public void add(IgniteTxEntry<K, V> entry) { + ensureModifiable(); + + entries.add(entry); + } + + /** + * @param entry Entry to remove. + * @return {@code True} if entry was removed. + */ + public boolean removeEntry(IgniteTxEntry<K, V> entry) { + ensureModifiable(); + + return entries.remove(entry); + } + + /** + * @param parts Evicts partitions from mapping. + */ + public void evictPartitions(@Nullable int[] parts) { + if (!F.isEmpty(parts)) { + ensureModifiable(); + + evictPartitions(parts, entries); + } + } + + /** + * @param parts Partitions. + * @param c Collection. + */ + private void evictPartitions(int[] parts, Collection<IgniteTxEntry<K, V>> c) { + assert parts != null; + + for (Iterator<IgniteTxEntry<K, V>> it = c.iterator(); it.hasNext();) { + IgniteTxEntry<K, V> e = it.next(); + + GridCacheEntryEx<K,V> cached = e.cached(); + + if (U.containsIntArray(parts, cached.partition())) + it.remove(); + } + } + + /** + * @param keys Keys to evict readers for. + */ + public void evictReaders(@Nullable Collection<IgniteTxKey<K>> keys) { + if (keys == null || keys.isEmpty()) + return; + + ensureModifiable(); + + evictReaders(keys, entries); + } + + /** + * @param keys Keys to evict readers for. + * @param entries Entries to check. + */ + private void evictReaders(Collection<IgniteTxKey<K>> keys, @Nullable Collection<IgniteTxEntry<K, V>> entries) { + if (entries == null || entries.isEmpty()) + return; + + for (Iterator<IgniteTxEntry<K, V>> it = entries.iterator(); it.hasNext();) { + IgniteTxEntry<K, V> entry = it.next(); + + if (keys.contains(entry.txKey())) + it.remove(); + } + } + + /** + * Copies collection of entries if it is read-only. + */ + private void ensureModifiable() { + if (readOnly) { + entries = new ConcurrentLinkedQueue<>(entries); + + readOnly = false; + } + } + + /** {@inheritDoc} */ + public boolean empty() { + return entries.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(node); + + U.writeCollection(out, entries); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + node = (ClusterNode)in.readObject(); + + entries = U.readCollection(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDistributedTxMapping.class, this, "node", node.id()); + } +}