# ignite-283: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3beb04b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3beb04b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3beb04b2 Branch: refs/heads/ignite-283 Commit: 3beb04b202bae3e7d54fc3d423690218b390af86 Parents: f3b5731 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Feb 18 12:24:17 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Feb 18 12:24:18 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 4 +- .../cache/conflict/GridCacheConflictInfo.java | 33 ++------ .../conflict/GridCacheNoTtlConflictInfo.java | 53 +++++++++++-- .../conflict/GridCacheTtlConflictInfo.java | 82 +++++++++++++++++--- .../dht/atomic/GridDhtAtomicCache.java | 3 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 23 +++++- .../dht/atomic/GridNearAtomicUpdateRequest.java | 72 ++++++++--------- 7 files changed, 181 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index e1a0189..d8a42cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean res = true; - V oldVal = null; + V oldVal; V updated; GridCacheVersion enqueueVer = null; @@ -1789,7 +1789,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Calculate new value in case we met transform. if (op == GridCacheOperation.TRANSFORM) { - assert conflictCtx == null : "Cannot be TRANSFORM here is conflict resolution was performed earlier."; + assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier."; transformClo = writeObj; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java index 8f210cf..bc1621d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java @@ -1,53 +1,30 @@ package org.apache.ignite.internal.processors.cache.conflict; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; /** * Cache conflict info which is passed over the wire. */ -public abstract class GridCacheConflictInfo implements Externalizable { - /** - * Create conflict info. - * - * @param ver Version. - * @param ttl TTL. - * @param expireTime Expire time. - * @return Conflict info. - */ - public static GridCacheConflictInfo create(GridCacheVersion ver, long ttl, long expireTime) { - if (ttl == CU.TTL_NOT_CHANGED) { - assert expireTime == CU.EXPIRE_TIME_CALCULATE; - - return new GridCacheNoTtlConflictInfo(ver); - } - else { - assert ttl != CU.TTL_ZERO && ttl >= 0; - assert expireTime != CU.EXPIRE_TIME_CALCULATE && expireTime >= 0; - - return new GridCacheTtlConflictInfo(ver, ttl, expireTime); - } - } - +public interface GridCacheConflictInfo { /** * @return Version. */ - public abstract GridCacheVersion version(); + public GridCacheVersion version(); /** * @return TTL. */ - public abstract long ttl(); + public long ttl(); /** * @return Expire time. */ - public abstract long expireTime(); + public long expireTime(); /** * @return {@code True} if has expiration info. */ - public abstract boolean hasExpirationInfo(); + public boolean hasExpirationInfo(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java index 926feef..90898b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java @@ -2,13 +2,16 @@ package org.apache.ignite.internal.processors.cache.conflict; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import sun.plugin2.message.*; import java.io.*; +import java.nio.*; /** * Conflict info without TTL. */ -public class GridCacheNoTtlConflictInfo extends GridCacheConflictInfo { +public class GridCacheNoTtlConflictInfo extends MessageAdapter implements GridCacheConflictInfo { /** Version. */ private GridCacheVersion ver; @@ -48,13 +51,49 @@ public class GridCacheNoTtlConflictInfo extends GridCacheConflictInfo { return false; } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ver); + @Override + public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isTypeWritten()) { + if (!writer.writeByte(null, directType())) + return false; + + writer.onTypeWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + + } + + return true; } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ver = (GridCacheVersion)in.readObject(); + @Override + public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + switch (readState) { + case 0: + ver = reader.readMessage("ver"); + + if (!reader.isLastRead()) + return false; + + readState++; + + } + + return true; + } + + @Override + public byte directType() { + return 0; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java index f4e6f29..395f505 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java @@ -2,6 +2,7 @@ package org.apache.ignite.internal.processors.cache.conflict; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; import java.io.*; import java.nio.*; @@ -9,7 +10,7 @@ import java.nio.*; /** * Conflict info with TTL. */ -public class GridCacheTtlConflictInfo extends GridCacheConflictInfo { +public class GridCacheTtlConflictInfo extends MessageAdapter implements GridCacheConflictInfo { /** Version. */ private GridCacheVersion ver; @@ -62,17 +63,76 @@ public class GridCacheTtlConflictInfo extends GridCacheConflictInfo { return true; } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ver); - out.writeLong(ttl); - out.writeLong(expireTime); + @Override + public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isTypeWritten()) { + if (!writer.writeByte(null, directType())) + return false; + + writer.onTypeWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("expireTime", expireTime)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + + } + + return true; } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ver = (GridCacheVersion)in.readObject(); - ttl = in.readLong(); - expireTime = in.readLong(); + @Override public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + switch (readState) { + case 0: + expireTime = reader.readLong("expireTime"); + + if (!reader.isLastRead()) + return false; + + readState++; + + case 1: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + readState++; + + case 2: + ver = reader.readMessage("ver"); + + if (!reader.isLastRead()) + return false; + + readState++; + + } + + return true; + } + + @Override + public byte directType() { + return 0; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a12de1b..687d9c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1701,7 +1701,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. req.filter(), replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, - new GridCacheConflictInnerUpdate(true, newConflictVer, newConflictTtl, newConflictExpireTime), + new GridCacheConflictInnerUpdate(newConflictInfo != null, newConflictVer, newConflictTtl, + newConflictExpireTime), intercept, req.subjectId(), taskName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 83344cf..ad46728 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -533,13 +533,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> GridCacheDrInfo<V> conflictPutVal = F.first(conflictPutVals); val = conflictPutVal.value(); - conflictInfo = GridCacheConflictInfo.create(conflictPutVal.version(), conflictPutVal.ttl(), + conflictInfo = create(conflictPutVal.version(), conflictPutVal.ttl(), conflictPutVal.expireTime()); } else if (conflictRmvVals != null) { // Conflict REMOVE. val = null; - conflictInfo = GridCacheConflictInfo.create(F.first(conflictRmvVals), CU.TTL_NOT_CHANGED, + conflictInfo = create(F.first(conflictRmvVals), CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE); } else { @@ -658,12 +658,12 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> GridCacheDrInfo<V> conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); - conflictInfo = GridCacheConflictInfo.create(conflictPutVal.version(), conflictPutVal.ttl(), + conflictInfo = create(conflictPutVal.version(), conflictPutVal.ttl(), conflictPutVal.expireTime()); } else if (conflictRmvVals != null) { val = null; - conflictInfo = GridCacheConflictInfo.create(conflictRmvValsIt.next(), CU.TTL_NOT_CHANGED, + conflictInfo = create(conflictRmvValsIt.next(), CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE); } else { @@ -738,6 +738,21 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> doUpdate(pendingMappings); } + // TODO: IGNITE-283: Remove. + public static GridCacheConflictInfo create(GridCacheVersion ver, long ttl, long expireTime) { + if (ttl == CU.TTL_NOT_CHANGED) { + assert expireTime == CU.EXPIRE_TIME_CALCULATE; + + return new GridCacheNoTtlConflictInfo(ver); + } + else { + assert ttl != CU.TTL_ZERO && ttl >= 0; + assert expireTime != CU.EXPIRE_TIME_CALCULATE && expireTime >= 0; + + return new GridCacheTtlConflictInfo(ver, ttl, expireTime); + } + } + /** * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near * node and send updates in parallel to all participating nodes. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3beb04b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 0de8308..d3aceab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -485,103 +485,103 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im } switch (writer.state()) { - case 5: - if (!writer.writeCollection("drVers", conflictInfos, Type.MSG)) + case 3: + if (!writer.writeCollection("conflictInfos", conflictInfos, Type.MSG)) return false; writer.incrementState(); - case 6: + case 4: if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) return false; writer.incrementState(); - case 7: + case 5: if (!writer.writeBoolean("fastMap", fastMap)) return false; writer.incrementState(); - case 8: + case 6: if (!writer.writeObjectArray("filterBytes", filterBytes, Type.BYTE_ARR)) return false; writer.incrementState(); - case 9: + case 7: if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) return false; writer.incrementState(); - case 10: + case 8: if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); - case 11: + case 9: if (!writer.writeBoolean("hasPrimary", hasPrimary)) return false; writer.incrementState(); - case 12: + case 10: if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, Type.BYTE_ARR)) return false; writer.incrementState(); - case 13: + case 11: if (!writer.writeCollection("keyBytes", keyBytes, Type.BYTE_ARR)) return false; writer.incrementState(); - case 14: + case 12: if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); - case 15: + case 13: if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); - case 16: + case 14: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 17: + case 15: if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); - case 18: + case 16: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 19: + case 17: if (!writer.writeLong("topVer", topVer)) return false; writer.incrementState(); - case 20: + case 18: if (!writer.writeMessage("updateVer", updateVer)) return false; writer.incrementState(); - case 21: + case 19: if (!writer.writeCollection("valBytes", valBytes, Type.MSG)) return false; @@ -600,15 +600,15 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im return false; switch (readState) { - case 5: - conflictInfos = reader.readCollection("drVers", Type.MSG); + case 3: + conflictInfos = reader.readCollection("conflictInfos", Type.MSG); if (!reader.isLastRead()) return false; readState++; - case 6: + case 4: expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) @@ -616,7 +616,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 7: + case 5: fastMap = reader.readBoolean("fastMap"); if (!reader.isLastRead()) @@ -624,7 +624,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 8: + case 6: filterBytes = reader.readObjectArray("filterBytes", Type.BYTE_ARR, byte[].class); if (!reader.isLastRead()) @@ -632,7 +632,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 9: + case 7: forceTransformBackups = reader.readBoolean("forceTransformBackups"); if (!reader.isLastRead()) @@ -640,7 +640,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 10: + case 8: futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) @@ -648,7 +648,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 11: + case 9: hasPrimary = reader.readBoolean("hasPrimary"); if (!reader.isLastRead()) @@ -656,7 +656,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 12: + case 10: invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", Type.BYTE_ARR, byte[].class); if (!reader.isLastRead()) @@ -664,7 +664,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 13: + case 11: keyBytes = reader.readCollection("keyBytes", Type.BYTE_ARR); if (!reader.isLastRead()) @@ -672,7 +672,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 14: + case 12: byte opOrd; opOrd = reader.readByte("op"); @@ -684,7 +684,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 15: + case 13: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -692,7 +692,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 16: + case 14: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -700,7 +700,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 17: + case 15: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -712,7 +712,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 18: + case 16: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -720,7 +720,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 19: + case 17: topVer = reader.readLong("topVer"); if (!reader.isLastRead()) @@ -728,7 +728,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 20: + case 18: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -736,7 +736,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im readState++; - case 21: + case 19: valBytes = reader.readCollection("valBytes", Type.MSG); if (!reader.isLastRead())