# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea39d669 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea39d669 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea39d669 Branch: refs/heads/ignite-51 Commit: ea39d669b0964b0d420c4e21f0f862667c88c41d Parents: 6445389 Author: sboikov <sboi...@gridgain.com> Authored: Mon Mar 2 18:41:15 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 3 00:21:43 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 2 +- .../communication/GridIoMessageFactory.java | 6 + .../processors/cache/GridCacheAdapter.java | 24 +- .../processors/cache/GridCacheEntryEx.java | 21 +- .../processors/cache/GridCacheEntryInfo.java | 6 +- .../processors/cache/GridCacheMapEntry.java | 161 ++--------- .../processors/cache/GridCacheSwapManager.java | 4 +- .../distributed/GridCacheTtlUpdateRequest.java | 4 +- .../GridDistributedTxRemoteAdapter.java | 40 +-- .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../distributed/dht/GridDhtCacheEntry.java | 11 +- .../cache/distributed/dht/GridDhtGetFuture.java | 6 +- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../distributed/dht/GridDhtLockRequest.java | 2 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 10 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 16 +- .../dht/GridDhtTxPrepareResponse.java | 24 +- .../cache/distributed/dht/GridDhtTxRemote.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 26 +- .../colocated/GridDhtColocatedLockFuture.java | 2 +- .../colocated/GridDhtDetachedCacheEntry.java | 7 +- .../distributed/near/GridNearCacheEntry.java | 11 +- .../distributed/near/GridNearLockFuture.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 6 +- .../near/GridNearTxPrepareFuture.java | 8 +- .../distributed/near/GridNearTxRemote.java | 2 +- .../cache/local/GridLocalTxFuture.java | 4 +- .../continuous/CacheContinuousQueryEntry.java | 272 +++++++++++-------- .../continuous/CacheContinuousQueryEvent.java | 32 ++- .../continuous/CacheContinuousQueryHandler.java | 64 ++--- .../continuous/CacheContinuousQueryManager.java | 80 +++--- .../cache/transactions/IgniteTxEntry.java | 29 +- .../transactions/IgniteTxLocalAdapter.java | 18 +- .../cache/transactions/IgniteTxManager.java | 8 +- .../continuous/GridContinuousMessage.java | 43 ++- .../continuous/GridContinuousProcessor.java | 40 ++- .../cache/GridCacheAbstractFullApiSelfTest.java | 2 +- .../cache/GridCacheStoreValueBytesSelfTest.java | 21 -- .../processors/cache/GridCacheTestEntryEx.java | 23 +- ...achePartitionedMultiNodeCounterSelfTest.java | 4 +- 41 files changed, 470 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 68d8c0b..1b0c09c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -150,7 +150,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { } } - ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false); + ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false); } catch (IgniteCheckedException e) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 9642bfb..57b5ac4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.clock.*; @@ -528,6 +529,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 96: + msg = new CacheContinuousQueryEntry(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 34d65bf..58e21f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -963,12 +963,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (peek != null) { CacheObject v = peek.get(); - return v.value(ctx, true); -// TODO IGNITE-51 -// if (ctx.portableEnabled()) -// v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); -// -// return F.t(ctx.cloneOnFlag(v)); + Object val0 = v.value(ctx, true); + + if (ctx.portableEnabled()) + val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); + + return F.t((V)val0); } } @@ -980,12 +980,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (peek != null) { CacheObject v = peek.get(); - return v.value(ctx, true); -// TODO IGNITE-51 -// if (ctx.portableEnabled()) -// v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); -// -// return F.t(ctx.cloneOnFlag(v)); + Object val0 = v.value(ctx, true); + + if (ctx.portableEnabled()) + val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); + + return F.t((V) val0); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 30df242..5196965 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -565,17 +565,6 @@ public interface GridCacheEntryEx { public boolean markObsoleteVersion(GridCacheVersion ver); /** - * @return Key bytes. - */ - public byte[] keyBytes(); - - /** - * @return Key bytes. - * @throws IgniteCheckedException If marshalling failed. - */ - public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException; - - /** * @return Version. * @throws GridCacheEntryRemovedException If entry has been removed. */ @@ -858,16 +847,10 @@ public interface GridCacheEntryEx { @Nullable public GridCacheMvccCandidate localOwner() throws GridCacheEntryRemovedException; /** - * @param keyBytes Key bytes. - * @throws GridCacheEntryRemovedException If entry was removed. - */ - public void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException; - - /** * @return Value bytes. * @throws GridCacheEntryRemovedException If entry was removed. */ - public GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException; + public CacheObject valueBytes() throws GridCacheEntryRemovedException; /** * Gets cached serialized value bytes. @@ -877,7 +860,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If serialization failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver) + @Nullable public CacheObject valueBytes(@Nullable GridCacheVersion ver) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index 96d7ee2..0179ad0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -325,7 +325,8 @@ public class GridCacheEntryInfo implements Externalizable, Message { public void marshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx.cacheObjectContext()); - val.prepareMarshal(ctx.cacheObjectContext()); + if (val != null) + val.prepareMarshal(ctx.cacheObjectContext()); // TODO IGNITE-51 // boolean depEnabled = ctx.gridDeploy().enabled(); // @@ -352,7 +353,8 @@ public class GridCacheEntryInfo implements Externalizable, Message { public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { key.finishUnmarshal(ctx, clsLdr); - val.finishUnmarshal(ctx, clsLdr); + if (val != null) + val.finishUnmarshal(ctx, clsLdr); // TODO IGNITE-51 // Marshaller mrsh = ctx.marshaller(); // http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 21255e2..7d1dc73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -272,33 +272,21 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** * @return Value bytes. */ - protected GridCacheValueBytes valueBytesUnlocked() { + protected CacheObject valueBytesUnlocked() { assert Thread.holdsLock(this); - if (!isOffHeapValuesOnly()) { -// TODO IGNITE-51. -// if (valBytes != null) -// return GridCacheValueBytes.marshaled(valBytes); + CacheObject val0 = val; - try { - if (valPtr != 0 && cctx.offheapTiered()) - return offheapValueBytes(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - else { - if (valPtr != 0) { - GridUnsafeMemory mem = cctx.unsafeMemory(); - - assert mem != null; + if (val0 == null && valPtr != 0) { + IgniteBiTuple<byte[], Boolean> t = valueBytes0(); - return mem.getOffHeap(valPtr); - } + if (t.get2()) + val0 = cctx.toCacheObject(t.get1(), null); + else + val0 = cctx.toCacheObject(null, t.get1()); } - return GridCacheValueBytes.nil(); + return val0; } /** {@inheritDoc} */ @@ -439,33 +427,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { info.setNew(isStartVersion()); info.setDeleted(deletedUnlocked()); - if (!expired) { - CacheObject val0 = val; - - if (val0 == null && valPtr != 0) { - IgniteBiTuple<byte[], Boolean> t = valueBytes0(); - - if (t.get2()) - val0 = cctx.toCacheObject(t.get1(), null); - else - val0 = cctx.toCacheObject(null, t.get1()); - - } - - info.value(val0); -// TODO IGNITE-51. -// info.value(cctx.kernalContext().config().isPeerClassLoadingEnabled() ? -// rawGetOrUnmarshalUnlocked(false) : val); -// -// GridCacheValueBytes valBytes = valueBytesUnlocked(); -// -// if (!valBytes.isNull()) { -// if (valBytes.isPlain()) -// info.value((V)valBytes.get()); -// else -// info.valueBytes(valBytes.get()); -// } - } + if (!expired) + info.value(valueBytesUnlocked()); } } } @@ -611,6 +574,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @return Value bytes and flag indicating whether value is byte array. */ protected IgniteBiTuple<byte[], Boolean> valueBytes0() { + assert Thread.holdsLock(this); + if (valPtr != 0) { assert isOffHeapValuesOnly() || cctx.offheapTiered(); @@ -827,7 +792,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { taskName); } - cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null); + cctx.continuousQueries().onEntryExpired(this, key, expiredVal); // No more notifications. evt = false; @@ -1064,8 +1029,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val; - GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (intercept) { key0 = key.value(cctx, false); val0 = CU.value(val, cctx, false); @@ -1138,7 +1101,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1228,8 +1191,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (old == null) old = saveValueForIndexUnlocked(); @@ -1295,7 +1256,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1388,8 +1349,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Possibly get old value form store. old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; - GridCacheValueBytes oldBytes = valueBytesUnlocked(); - boolean readThrough = false; Object old0 = null; @@ -1618,7 +1577,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -1821,7 +1780,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Prepare old value and value bytes. oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; - GridCacheValueBytes oldValBytes = valueBytesUnlocked(); // Possibly read value from store. boolean readThrough = false; @@ -2196,8 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), - oldVal, oldValBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3298,8 +3255,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (!skipQryNtf) { if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null, - preload); + cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload); cctx.dataStructures().onEntryUpdated(key, false); } @@ -3617,7 +3573,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { null); } - cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null); + cctx.continuousQueries().onEntryExpired(this, key, expiredVal); } } } @@ -3707,84 +3663,25 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** {@inheritDoc} */ - @Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException { - checkObsolete(); - -// TODO IGNITE-51. -// if (keyBytes != null) -// this.keyBytes = keyBytes; - } - - /** {@inheritDoc} */ - @Override public synchronized byte[] keyBytes() { -// TODO IGNITE-51. -// return keyBytes; - return null; - } - - /** {@inheritDoc} */ - @Override public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException { -// TODO IGNITE-51. -// byte[] bytes = keyBytes(); -// -// if (bytes != null) -// return bytes; -// -// bytes = CU.marshal(cctx.shared(), key); -// -// synchronized (this) { -// keyBytes = bytes; -// } -// -// return bytes; - return null; - } - - /** {@inheritDoc} */ - @Override public synchronized GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException { + @Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException { checkObsolete(); return valueBytesUnlocked(); } /** {@inheritDoc} */ - @Nullable @Override public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver) + @Nullable @Override public CacheObject valueBytes(@Nullable GridCacheVersion ver) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject val = null; - GridCacheValueBytes valBytes = GridCacheValueBytes.nil(); -// TODO IGNITE-51. -// synchronized (this) { -// checkObsolete(); -// -// if (ver == null || this.ver.equals(ver)) { -// val = this.val; -// ver = this.ver; -// valBytes = valueBytesUnlocked(); -// -// if (valBytes.isNull() && cctx.offheapTiered() && valPtr != 0) -// valBytes = offheapValueBytes(); -// } -// else -// ver = null; -// } -// -// if (valBytes.isNull()) { -// if (val != null) -// valBytes = (val instanceof byte[]) ? GridCacheValueBytes.plain(val) : -// GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), val)); -// -// if (ver != null && !isOffHeapValuesOnly()) { -// synchronized (this) { -// checkObsolete(); -// -// if (this.val == val) -// this.valBytes = isStoreValueBytes() ? valBytes.getIfMarshaled() : null; -// } -// } -// } + synchronized (this) { + checkObsolete(); + + if (ver == null || this.ver.equals(ver)) + val = valueBytesUnlocked(); + } - return valBytes; + return val; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 012d393..ef04a8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -678,7 +678,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return null; return read(entry.key(), - entry.getOrMarshalKeyBytes(), + entry.key().valueBytes(cctx), entry.partition(), locked, readOffheap, @@ -698,8 +698,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - byte[] keyBytes = entry.getOrMarshalKeyBytes(); - IgniteBiTuple<Long, Integer> ptr = offheap.valuePointer(spaceName, part, key, key.valueBytes(cctx)); if (ptr != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index 95b9095..028ab12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -67,12 +67,14 @@ public class GridCacheTtlUpdateRequest extends GridCacheMessage { } /** + * @param cacheId Cache ID. * @param topVer Topology version. * @param ttl TTL. */ - public GridCacheTtlUpdateRequest(long topVer, long ttl) { + public GridCacheTtlUpdateRequest(int cacheId, long topVer, long ttl) { assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; + this.cacheId = cacheId; this.topVer = topVer; this.ttl = ttl; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3ce5cd3..c787261 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -281,7 +281,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']'); // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); } } } @@ -327,13 +327,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter IgniteTxEntry rmv = readMap.remove(e.txKey()); if (rmv != null) { - e.cached(rmv.cached(), null); + e.cached(rmv.cached()); writeMap.put(e.txKey(), e); } // If lock is explicit. else { - e.cached(e.context().cache().entryEx(e.key()), null); + e.cached(e.context().cache().entryEx(e.key())); // explicit lock. writeMap.put(e.txKey(), e); @@ -456,7 +456,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Got removed entry while committing (will retry): " + txEntry); - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); } } } @@ -484,7 +484,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter GridCacheEntryEx cached = txEntry.cached(); if (cached == null) - txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()), null); + txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key())); if (near() && cacheCtx.dr().receiveEnabled()) { cached.markObsolete(xidVer); @@ -563,19 +563,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Keep near entry up to date. if (nearCached != null) { - CacheObject val0 = null; - - GridCacheValueBytes valBytesTuple = cached.valueBytes(); - - if (!valBytesTuple.isNull()) { -// TODO IGNITE-51. -// if (valBytesTuple.isPlain()) -// val0 = (V)valBytesTuple.get(); -// else -// valBytes0 = valBytesTuple.get(); - } - else - val0 = cached.rawGet(); + CacheObject val0 = cached.valueBytes(); nearCached.updateOrEvict(xidVer, val0, @@ -621,19 +609,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cached.updateTtl(null, txEntry.ttl()); if (nearCached != null) { - CacheObject val0 = null; - - GridCacheValueBytes valBytesTuple = cached.valueBytes(); - - if (!valBytesTuple.isNull()) { -// TODO IGNITE-51. -// if (valBytesTuple.isPlain()) -// val0 = (V)valBytesTuple.get(); -// else -// valBytes0 = valBytesTuple.get(); - } - else - val0 = cached.rawGet(); + CacheObject val0 = cached.valueBytes(); nearCached.updateOrEvict(xidVer, val0, @@ -662,7 +638,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter log.debug("Attempting to commit a removed entry (will retry): " + txEntry); // Renew cached entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 5febfcc..bb7d308 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -695,10 +695,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheTtlUpdateRequest req = reqMap.get(node); if (req == null) { - reqMap.put(node, - req = new GridCacheTtlUpdateRequest(topVer, expiryPlc.forAccess())); - - req.cacheId(ctx.cacheId()); + reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), + topVer, + expiryPlc.forAccess())); } req.addEntry(e.getKey(), e.getValue()); @@ -718,10 +717,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheTtlUpdateRequest req = reqMap.get(node); if (req == null) { - reqMap.put(node, req = new GridCacheTtlUpdateRequest(topVer, + reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), + topVer, expiryPlc.forAccess())); - - req.cacheId(ctx.cacheId()); } for (IgniteBiTuple<KeyCacheObject, GridCacheVersion> t : e.getValue()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 6e4eac8..15648fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -311,16 +311,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { if (isNew() || !valid(-1) || deletedUnlocked()) return null; else { - CacheObject val0 = val; - - if (val0 == null && valPtr != 0) { - IgniteBiTuple<byte[], Boolean> t = valueBytes0(); - - if (t.get2()) - val0 = cctx.toCacheObject(t.get1(), null); - else - val0 = cctx.toCacheObject(null, t.get1()); - } + CacheObject val0 = valueBytesUnlocked(); return F.t(ver, val0, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index e9674c8..8eb0809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -419,12 +419,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) { GridCacheEntryInfo info = it.next(); - CacheObject v = map.get(info.key()); + Object v = map.get(info.key()); if (v == null) it.remove(); - else - info.value(v); + else if (!skipVals) + info.value((CacheObject)v); } return infos; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 056c3ad..6fcc7f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -906,7 +906,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo assert added.dhtLocal(); if (added.ownerVersion() != null) - req.owned(e.key(), e.getOrMarshalKeyBytes(), added.ownerVersion()); + req.owned(e.key(), added.ownerVersion()); break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 4f54f47..87c786d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -260,7 +260,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { * @param keyBytes Key bytes. * @param ownerMapped Owner mapped version. */ - public void owned(KeyCacheObject key, byte[] keyBytes, GridCacheVersion ownerMapped) { + public void owned(KeyCacheObject key, GridCacheVersion ownerMapped) { if (owned == null) owned = new GridLeanMap<>(3); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index d2b7d36..060b02c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -209,7 +209,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (e.cached().obsolete()) { GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key()); - e.cached(cached, cached.keyBytes()); + e.cached(cached); } if (e.cached().detached() || e.cached().isLocal()) @@ -234,7 +234,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { catch (GridCacheEntryRemovedException ignore) { GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key()); - e.cached(cached, cached.keyBytes()); + e.cached(cached); } } } @@ -462,7 +462,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion()); - entry.cached(cached, null); + entry.cached(cached); GridCacheVersion explicit = entry.explicitVersion(); @@ -562,7 +562,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (read) txEntry.ttl(accessTtl); - txEntry.cached(cached, null); + txEntry.cached(cached); addReader(msgId, cached, txEntry, topVer); } @@ -710,7 +710,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), null); + txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion())); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 7f9022a..11101fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -429,7 +429,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (entry == null) { entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); - txEntry.cached(entry, null); + txEntry.cached(entry); } if (tx.optimistic() && txEntry.explicitVersion() == null) { @@ -455,7 +455,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); - txEntry.cached(entry, null); + txEntry.cached(entry); } } } @@ -625,7 +625,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheVersion dhtVer = entry.version(); - CacheObject val0 = entry.rawGet(); + CacheObject val0 = entry.valueBytes(); if (val0 != null) res.addOwnedValue(txEntry.txKey(), dhtVer, val0); @@ -634,7 +634,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); } } } @@ -655,7 +655,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheVersion dhtVer = entry.version(); if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) { - CacheObject val0 = entry.rawGet(); + CacheObject val0 = entry.valueBytes(); res.addOwnedValue(txEntry.txKey(), dhtVer, val0); } @@ -664,7 +664,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); } } } @@ -1015,7 +1015,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu catch (GridCacheEntryRemovedException ignore) { cached = dht.entryExx(entry.key()); - entry.cached(cached, cached.keyBytes()); + entry.cached(cached); } } @@ -1195,7 +1195,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (e == null) break; - entry.cached(e, null); + entry.cached(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 60c9c2f..cfec044 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -167,16 +167,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - GridCacheContext cctx = ctx.cacheContext(cacheId); - if (nearEvicted != null) { - for (IgniteTxKey key : nearEvicted) + for (IgniteTxKey key : nearEvicted) { + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); + key.prepareMarshal(cctx); + } } if (preloadEntries != null) { - for (GridCacheEntryInfo info : preloadEntries) + for (GridCacheEntryInfo info : preloadEntries) { + GridCacheContext cctx = ctx.cacheContext(info.cacheId()); + info.marshal(cctx); + } } } @@ -184,16 +188,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cctx = ctx.cacheContext(cacheId); - if (nearEvicted != null) { - for (IgniteTxKey key : nearEvicted) + for (IgniteTxKey key : nearEvicted) { + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); + key.finishUnmarshal(cctx, ldr); + } } if (preloadEntries != null) { - for (GridCacheEntryInfo info : preloadEntries) + for (GridCacheEntryInfo info : preloadEntries) { + GridCacheContext cctx = ctx.cacheContext(info.cacheId()); + info.unmarshal(cctx, ldr); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 3f9ce8b..8dc91c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -264,7 +264,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { checkInternal(entry.txKey()); // Initialize cache entry. - entry.cached(cached, null); + entry.cached(cached); writeMap.put(entry.txKey(), entry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index adcaebf..2012252 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2032,7 +2032,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); - if (writeVal != null || !entry.valueBytes().isNull()) { + if (writeVal != null || entry.hasValue()) { IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); assert f == null : f; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index a9a26c6..567bf67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -327,12 +327,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem GridCacheReturn ret = (GridCacheReturn)res; - if (op != TRANSFORM && ret != null) { - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, cctx, false)); - } - Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success(); if (op == TRANSFORM && retval == null) @@ -362,6 +356,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem return; } + GridCacheReturn ret = res.returnValue(); + + if (op != TRANSFORM && ret != null) { + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, cctx, false)); + } + Boolean single0 = single; if (single0 != null && single0) { @@ -374,13 +376,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem onDone(addFailedKeys(res.failedKeys(), res.error())); else { if (op == TRANSFORM) { - if (res.returnValue() != null) - addInvokeResults(res.returnValue()); + if (ret != null) + addInvokeResults(ret); onDone(opRes); } else { - GridCacheReturn<?> opRes0 = opRes = res.returnValue(); + GridCacheReturn<?> opRes0 = opRes = ret; onDone(opRes0); } @@ -398,11 +400,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (op == TRANSFORM) { assert !req.fastMap(); - if (res.returnValue() != null) - addInvokeResults(res.returnValue()); + if (ret != null) + addInvokeResults(ret); } else if (req.fastMap() && req.hasPrimary()) - opRes = res.returnValue(); + opRes = ret; } mappings.remove(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 29e9730..441c2fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -270,7 +270,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (inTx()) { IgniteTxEntry txEntry = tx.entry(entry.txKey()); - txEntry.cached(entry, null); + txEntry.cached(entry); if (cand != null) { if (!tx.implicit()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index b9602d9..154d99e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -67,11 +67,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected GridCacheValueBytes valueBytesUnlocked() { - return null; -// TODO IGNITE-51. -// return (val != null && val instanceof byte[]) ? GridCacheValueBytes.plain(val) : -// valBytes == null ? GridCacheValueBytes.nil() : GridCacheValueBytes.marshaled(valBytes); + @Override protected CacheObject valueBytesUnlocked() { + return val; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index b47d288..d06ca5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -273,16 +273,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { if (dhtVer == null) return null; else { - CacheObject val0 = val; - - if (val0 == null && valPtr != 0) { - IgniteBiTuple<byte[], Boolean> t = valueBytes0(); - - if (t.get2()) - val0 = cctx.toCacheObject(t.get1(), null); - else - val0 = cctx.toCacheObject(null, t.get1()); - } + CacheObject val0 = valueBytesUnlocked(); return F.t(ver, val0, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 9da55ea..c855b47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -316,7 +316,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx()) { IgniteTxEntry txEntry = tx.entry(entry.txKey()); - txEntry.cached(entry, null); + txEntry.cached(entry); } if (c != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8d6800d..ef2899a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -582,7 +582,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { ", tx=" + this + ']'); // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), entry.keyBytes()); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); } } } @@ -1129,7 +1129,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (cached.obsoleteVersion() != null) { cached = cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true); - txEntry.cached(cached, null); + txEntry.cached(cached); } return cached; @@ -1156,7 +1156,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (cached.obsoleteVersion() != null) { cached = cacheCtx.colocated().entryExx(key.key(), topVer, true); - txEntry.cached(cached, null); + txEntry.cached(cached); } return cached; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 491a171..9bb3aa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -778,11 +778,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut // Must re-initialize cached entry while holding topology lock. if (cacheCtx.isNear()) - entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), null); + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); else if (!cacheCtx.isLocal()) - entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), null); + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); else - entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), null); + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (cacheCtx.isNear() || cacheCtx.isLocal()) { if (waitLock && entry.explicitVersion() == null) { @@ -812,7 +812,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut break; } catch (GridCacheEntryRemovedException ignore) { - entry.cached(cacheCtx.near().entryEx(entry.key()), null); + entry.cached(cacheCtx.near().entryEx(entry.key())); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 2b40e88..ccd4c8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -299,7 +299,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } else { // Initialize cache entry. - entry.cached(cached, null); + entry.cached(cached); writeMap.put(entry.txKey(), entry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java index 619ce38..bc248aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java @@ -218,7 +218,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx> if (log.isDebugEnabled()) log.debug("Got removed entry in checkLocks method (will retry): " + txEntry); - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); } } } @@ -264,7 +264,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx> if (log.isDebugEnabled()) log.debug("Got removed entry in onOwnerChanged method (will retry): " + txEntry); - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index d98b254..43aaec3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -18,144 +18,145 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; -import java.io.*; - -import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*; +import javax.cache.event.*; +import java.nio.*; /** * Continuous query entry. */ -class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable { +public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { + /** */ + private static final EventType[] EVT_TYPE_VALS = EventType.values(); + + /** + * @param ord Event type ordinal value. + * @return Event type. + */ + @Nullable public static EventType eventTypeFromOrdinal(int ord) { + return ord >= 0 && ord < EVT_TYPE_VALS.length ? EVT_TYPE_VALS[ord] : null; + } + /** */ - private static final long serialVersionUID = 0L; + private EventType evtType; /** Key. */ @GridToStringInclude - private K key; + private KeyCacheObject key; /** New value. */ @GridToStringInclude - private V newVal; + private CacheObject newVal; /** Old value. */ @GridToStringInclude - private V oldVal; - - /** Serialized key. */ - @GridToStringExclude - private byte[] keyBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes newValBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes oldValBytes; + private CacheObject oldVal; /** Cache name. */ - private String cacheName; + private int cacheId; /** Deployment info. */ @GridToStringExclude + @GridDirectTransient private GridDeploymentInfo depInfo; + /** + * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + */ public CacheContinuousQueryEntry() { // No-op. } - CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, - @Nullable GridCacheValueBytes oldValBytes) { - + /** + * @param cacheId Cache ID. + * @param evtType Event type. + * @param key Key. + * @param newVal New value. + * @param oldVal Old value. + */ + CacheContinuousQueryEntry( + int cacheId, + EventType evtType, + KeyCacheObject key, + @Nullable CacheObject newVal, + @Nullable CacheObject oldVal) { + this.cacheId = cacheId; + this.evtType = evtType; this.key = key; this.newVal = newVal; - this.newValBytes = newValBytes; this.oldVal = oldVal; - this.oldValBytes = oldValBytes; } /** - * @param cacheName Cache name. + * @return Cache ID. */ - void cacheName(String cacheName) { - this.cacheName = cacheName; + int cacheId() { + return cacheId; } /** - * @return cache name. + * @return Event type. */ - String cacheName() { - return cacheName; + EventType eventType() { + return evtType; } /** - * @param marsh Marshaller. + * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - + void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { assert key != null; - keyBytes = marsh.marshal(key); + key.prepareMarshal(cctx.cacheObjectContext()); - if (newValBytes == null || newValBytes.isNull()) - newValBytes = newVal != null ? - newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null; + if (newVal != null) + newVal.prepareMarshal(cctx.cacheObjectContext()); - if (oldValBytes == null || oldValBytes.isNull()) - oldValBytes = oldVal != null ? - oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null; + if (oldVal != null) + oldVal.prepareMarshal(cctx.cacheObjectContext()); } /** - * @param marsh Marshaller. + * @param cctx Cache context. * @param ldr Class loader. * @throws IgniteCheckedException In case of error. */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; + void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { + key.finishUnmarshal(cctx, ldr); - assert key == null : "Key should be null: " + key; - assert newVal == null : "New value should be null: " + newVal; - assert oldVal == null : "Old value should be null: " + oldVal; - assert keyBytes != null; + if (newVal != null) + newVal.finishUnmarshal(cctx, ldr); - key = marsh.unmarshal(keyBytes, ldr); - - if (newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - - if (oldValBytes != null && !oldValBytes.isNull()) - oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr); + if (oldVal != null) + oldVal.finishUnmarshal(cctx, ldr); } /** * @return Key. */ - K key() { + KeyCacheObject key() { return key; } /** * @return New value. */ - V value() { + CacheObject value() { return newVal; } /** * @return Old value. */ - V oldValue() { + CacheObject oldValue() { return oldVal; } @@ -170,62 +171,117 @@ class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externaliz } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = keyBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, keyBytes); - - if (newValBytes != null && !newValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(newValBytes.isPlain()); - U.writeByteArray(out, newValBytes.get()); - } - else - out.writeBoolean(false); - - if (oldValBytes != null && !oldValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(oldValBytes.isPlain()); - U.writeByteArray(out, oldValBytes.get()); - } - else - out.writeBoolean(false); - - U.writeString(out, cacheName); - out.writeObject(depInfo); + @Override public byte directType() { + return 96; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); } - else { - out.writeObject(key); - out.writeObject(newVal); - out.writeObject(oldVal); + + switch (writer.state()) { + case 0: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeMessage("newVal", newVal)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("oldVal", oldVal)) + return false; + + writer.incrementState(); + } + + return true; } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); - if (b) { - keyBytes = U.readByteArray(in); + if (!reader.beforeMessageRead()) + return false; - if (in.readBoolean()) - newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + switch (reader.state()) { + case 0: + cacheId = reader.readInt("cacheId"); - if (in.readBoolean()) - oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + byte evtTypeOrd; + + evtTypeOrd = reader.readByte("evtType"); + + if (!reader.isLastRead()) + return false; + + evtType = eventTypeFromOrdinal(evtTypeOrd); + + reader.incrementState(); + + case 2: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + newVal = reader.readMessage("newVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + oldVal = reader.readMessage("oldVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); - cacheName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else { - key = (K)in.readObject(); - newVal = (V)in.readObject(); - oldVal = (V)in.readObject(); } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index c90ae34..1bdadaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -27,43 +28,45 @@ import javax.cache.event.*; * Continuous query event. */ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { + /** */ + private final GridCacheContext cctx; + /** Entry. */ @GridToStringExclude - private final CacheContinuousQueryEntry<K, V> e; + private final CacheContinuousQueryEntry e; /** - * @param source Source cache. - * @param eventType Event type. + * @param src Source cache. + * @param cctx Cache context. * @param e Entry. */ - CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) { - super(source, eventType); - - assert e != null; + CacheContinuousQueryEvent(Cache src, GridCacheContext cctx, CacheContinuousQueryEntry e) { + super(src, e.eventType()); + this.cctx = cctx; this.e = e; } /** * @return Entry. */ - CacheContinuousQueryEntry<K, V> entry() { + CacheContinuousQueryEntry entry() { return e; } /** {@inheritDoc} */ @Override public K getKey() { - return e.key(); + return e.key().value(cctx, false); } /** {@inheritDoc} */ @Override public V getValue() { - return e.value(); + return CU.value(e.value(), cctx, false); } /** {@inheritDoc} */ @Override public V getOldValue() { - return e.oldValue(); + return CU.value(e.oldValue(), cctx, false); } /** {@inheritDoc} */ @@ -81,7 +84,10 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(CacheContinuousQueryEvent.class, this, "key", e.key(), "newVal", e.value(), "oldVal", - e.oldValue(), "cacheName", e.cacheName()); + return S.toString(CacheContinuousQueryEvent.class, this, + "evtType", getEventType(), + "key", getKey(), + "newVal", getValue(), + "oldVal", getOldValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 9502b3f..53a2cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -212,17 +212,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (ctx.config().isPeerClassLoadingEnabled() && node != null && U.hasCache(node, cacheName)) { - evt.entry().p2pMarshal(ctx.config().getMarshaller()); - - evt.entry().cacheName(cacheName); + evt.entry().prepareMarshal(cctx); GridCacheDeploymentManager depMgr = ctx.cache().internalCache(cacheName).context().deploy(); depMgr.prepare(evt.entry()); } + else + evt.entry().prepareMarshal(cctx); - ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true); } catch (IgniteCheckedException ex) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); @@ -302,46 +302,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { assert objs != null; assert ctx != null; - Collection<CacheEntryEvent<? extends K, ? extends V>> evts = - (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs; - - if (ctx.config().isPeerClassLoadingEnabled()) { - for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { - assert evt instanceof CacheContinuousQueryEvent; + Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; - CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry(); + final GridCacheContext cctx = cacheContext(ctx); - GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName()); + for (CacheContinuousQueryEntry e : entries) { + GridCacheDeploymentManager depMgr = cctx.deploy(); - ClassLoader ldr = null; + ClassLoader ldr = depMgr.globalLoader(); - if (cache != null) { - GridCacheDeploymentManager depMgr = cache.context().deploy(); + if (ctx.config().isPeerClassLoadingEnabled()) { + GridDeploymentInfo depInfo = e.deployInfo(); - GridDeploymentInfo depInfo = e.deployInfo(); - - if (depInfo != null) { - depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), - depInfo.participants(), depInfo.localDeploymentOwner()); - } - - ldr = depMgr.globalLoader(); - } - else { - U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + - "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " + - "with default class loader."); + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); } + } - try { - e.p2pUnmarshal(ctx.config().getMarshaller(), ldr); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); - } + try { + e.unmarshal(cctx, ldr); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); } } + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<K, V>(cache, cctx, e); + }; + } + ); + locLsnr.onUpdated(evts); }