http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 45b13b7..d2b7d36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -464,24 +464,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { entry.cached(cached, null); -// TODO IGNITE-51. -// while (true) { -// GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion()); -// -// try { -// // Set key bytes to avoid serializing in future. -// cached.keyBytes(entry.keyBytes()); -// -// entry.cached(cached, null); -// -// break; -// } -// catch (GridCacheEntryRemovedException ignore) { -// if (log.isDebugEnabled()) -// log.debug("Got removed entry when adding to dht tx (will retry): " + cached); -// } -// } - GridCacheVersion explicit = entry.explicitVersion(); if (explicit != null) { @@ -641,17 +623,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); - IgniteInternalFuture<Boolean> fut = null; -// TODO IGNTIE-51 -// IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, -// lockTimeout(), -// this, -// isInvalidate(), -// read, -// /*retval*/false, -// isolation, -// accessTtl, -// CU.empty()); + IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, + lockTimeout(), + this, + isInvalidate(), + read, + /*retval*/false, + isolation, + accessTtl, + (IgnitePredicate[])CU.empty()); return new GridEmbeddedFuture<>( fut,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 369935b..7f9022a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -340,7 +340,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } if (err != null || procRes != null) - ret.addEntryProcessResult(key, + ret.addEntryProcessResult(key.value(cacheCtx, false), err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err)); else ret.invokeResult(true); @@ -1240,7 +1240,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; try { - if (entry.initialValue(info.value(), null, info.version(), + if (entry.initialValue(info.value(), info.version(), info.ttl(), info.expireTime(), true, topVer, drType)) { if (rec && !entry.isInternal()) cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 2423fe1..ec45af1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -328,73 +328,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 23: + case 24: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 24: + case 25: if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); - case 25: + case 26: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 26: + case 27: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 27: + case 28: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 28: + case 29: if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 29: + case 30: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 30: + case 31: if (!writer.writeByteArray("ownedBytes", ownedBytes)) return false; writer.incrementState(); - case 31: + case 32: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 32: + case 33: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 33: + case 34: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 34: + case 35: if (!writer.writeLong("topVer", topVer)) return false; @@ -416,7 +416,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 23: + case 24: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -424,7 +424,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 24: + case 25: invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) @@ -432,7 +432,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 26: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -440,7 +440,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 27: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -448,7 +448,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 28: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -456,7 +456,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 29: nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -464,7 +464,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 30: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -472,7 +472,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 31: ownedBytes = reader.readByteArray("ownedBytes"); if (!reader.isLastRead()) @@ -480,7 +480,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 32: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -488,7 +488,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 33: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -496,7 +496,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 34: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -504,7 +504,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 35: topVer = reader.readLong("topVer"); if (!reader.isLastRead()) @@ -524,6 +524,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 35; + return 36; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 96777c9..603141b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -60,7 +60,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private GridCacheContext<K, V> cctx; /** Keys. */ - private Collection<? extends K> keys; + private Collection<KeyCacheObject> keys; /** Topology version. */ private long topVer; @@ -127,7 +127,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, long topVer, boolean readThrough, boolean reload, @@ -167,21 +167,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M public void init() { long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); - Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() { - @Override public KeyCacheObject apply(K key) { - if (key == null) { - NullPointerException err = new NullPointerException("Null key."); - - onDone(err); - - throw err; - } - - return cctx.toCacheKeyObject(key); - } - }); - - map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); + map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); markInitialized(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 5e38db3..adcaebf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -276,6 +276,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean deserializePortable, final boolean skipVals ) { + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + + if (keyCheck) + validateCacheKeys(keys); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); subjId = ctx.subjectIdPerCall(null, prj); @@ -286,7 +294,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { @Override public IgniteInternalFuture<Map<K, V>> apply() { - return getAllAsync0(keys, + return getAllAsync0(ctx.cacheKeysView(keys), false, forcePrimary, subjId0, @@ -624,7 +632,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, long timeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, @@ -895,7 +903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param skipVals Skip values flag. * @return Get future. */ - private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, + private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, boolean reload, boolean forcePrimary, UUID subjId, @@ -903,14 +911,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVals) { - ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - - if (keyCheck) - validateCacheKeys(keys); - long topVer = ctx.affinity().affinityTopologyVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@ -922,17 +922,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean success = true; // Optimistically expect that all keys are available locally (avoid creation of get future). - for (K key : keys) { - if (key == null) - throw new NullPointerException("Null key."); - - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - + for (KeyCacheObject key : keys) { GridCacheEntryEx entry = null; while (true) { try { - entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); // If our DHT cache do has value, then we peek it. if (entry != null) { @@ -956,12 +951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion obsoleteVer = context().versions().next(); if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeIfObsolete(cacheKey); + removeIfObsolete(key); success = false; } else - ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, false); + ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, false); } else success = false; @@ -998,7 +993,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (success) { sendTtlUpdateRequest(expiry); - return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); + return new GridFinishedFuture<>(ctx.kernalContext(), locVals); } } @@ -1020,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { fut.init(); - return ctx.wrapCloneMap(fut); + return fut; } /** @@ -1700,7 +1695,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locNodeId, op, writeVal, - null, req.invokeArguments(), primary && writeThrough(), req.returnValue(), @@ -1965,7 +1959,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, writeVal, null, - null, false, false, expiry, @@ -2460,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { nodeId, op, op == TRANSFORM ? entryProcessor : val, - null, op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*retval*/false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index e2b974c..a9a26c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem GridCacheReturn ret = (GridCacheReturn)res; - if (op != TRANSFORM) { + if (op != TRANSFORM && ret != null) { CacheObject val = (CacheObject)ret.value(); ret.value(CU.value(val, cctx, false)); @@ -357,7 +357,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (res.remapKeys() != null) { assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; - // TODO IGNITE-51. mapOnTopology(res.remapKeys(), true, nodeId); return; @@ -809,7 +808,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) - onDone(new GridCacheReturn<Object>(null, true)); + onDone(new GridCacheReturn(null, true)); } catch (IgniteCheckedException e) { onDone(addFailedKeys(req.keys(), e)); @@ -928,8 +927,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (err0 == null) err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - // TODO IGNITE-51. - err0.add(failedKeys, err); + List<Object> keys = new ArrayList<>(failedKeys.size()); + + for (KeyCacheObject key : failedKeys) + keys.add(key.value(cctx, false)); + + err0.add(keys, err); return err0; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 84b32ce..a22b31c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -183,12 +183,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + if (keyCheck) + validateCacheKeys(keys); + IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { - return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); + return tx.getAllAsync(ctx, + ctx.cacheKeysView(keys), + entry, + deserializePortable, + skipVals, + false); } }); } @@ -200,7 +208,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId = ctx.subjectIdPerCall(subjId, prj); return loadAsync( - keys, + ctx.cacheKeysView(keys), true, false, forcePrimary, @@ -232,9 +240,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param taskName Task name. * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. * @return Loaded values. */ - public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, + public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean reload, boolean forcePrimary, @@ -248,9 +257,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - if (keyCheck) - validateCacheKeys(keys); - if (expiryPlc == null) expiryPlc = expiryPolicy(null); @@ -261,17 +267,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean success = true; // Optimistically expect that all keys are available locally (avoid creation of get future). - for (K key : keys) { - if (key == null) - throw new NullPointerException("Null key."); - + for (KeyCacheObject key : keys) { GridCacheEntryEx entry = null; - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - while (true) { try { - entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); // If our DHT cache do has value, then we peek it. if (entry != null) { @@ -295,12 +296,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheVersion obsoleteVer = context().versions().next(); if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeIfObsolete(cacheKey); + removeIfObsolete(key); success = false; } else - ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, true); + ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); } else success = false; @@ -337,7 +338,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (success) { sendTtlUpdateRequest(expiryPlc); - return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); + return new GridFinishedFuture<>(ctx.kernalContext(), locVals); } } @@ -360,7 +361,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte fut.init(); - return ctx.wrapCloneMap(fut); + return fut; } /** @@ -369,7 +370,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync( - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, long timeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, @@ -426,7 +427,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte Collection<KeyCacheObject> locKeys = new ArrayList<>(); for (K key : keys) { - // TODO IGNITE-51. KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); GridDistributedCacheEntry entry = peekExx(cacheKey); @@ -519,7 +519,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param ver Lock version. * @param keys Keys. */ - public void removeLocks(long threadId, GridCacheVersion ver, Collection<? extends K> keys) { + public void removeLocks(long threadId, GridCacheVersion ver, Collection<KeyCacheObject> keys) { if (keys.isEmpty()) return; @@ -530,11 +530,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte Collection<KeyCacheObject> locKeys = new LinkedList<>(); - for (K key : keys) { - // TODO IGNITE-51. - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - - GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver); + for (KeyCacheObject key : keys) { + GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver); if (lock != null) { long topVer = lock.topologyVersion(); @@ -559,14 +556,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte req.version(ver); } - GridCacheEntryEx entry = peekEx(cacheKey); + GridCacheEntryEx entry = peekEx(key); - KeyCacheObject key0 = entry != null ? entry.key() : cacheKey; + KeyCacheObject key0 = entry != null ? entry.key() : key; req.addKey(key0, ctx); } else - locKeys.add(cacheKey); + locKeys.add(key); } } @@ -616,7 +613,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final long threadId, final GridCacheVersion ver, final long topVer, - final Collection<K> keys, + final Collection<KeyCacheObject> keys, final boolean txRead, final long timeout, final long accessTtl, @@ -624,9 +621,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - // TODO IGNITE-51. - // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); - IgniteInternalFuture<Object> keyFut = null; + IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); // Prevent embedded future creation if possible. if (keyFut.isDone()) { @@ -691,7 +686,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte long threadId, final GridCacheVersion ver, final long topVer, - final Collection<K> keys, + final Collection<KeyCacheObject> keys, final boolean txRead, final long timeout, final long accessTtl, @@ -717,15 +712,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean timedout = false; - for (K key : keys) { + for (KeyCacheObject key : keys) { if (timedout) break; - // TODO IGNITE-51. - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - while (true) { - GridDhtCacheEntry entry = entryExx(cacheKey, topVer); + GridDhtCacheEntry entry = entryExx(key, topVer); try { fut.addEntry(key == null ? null : entry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 7d007a1..29e9730 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -66,7 +66,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity private long threadId; /** Keys to lock. */ - private Collection<? extends K> keys; + private Collection<KeyCacheObject> keys; /** Future ID. */ private IgniteUuid futId; @@ -133,7 +133,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity */ public GridDhtColocatedLockFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, boolean retval, @@ -535,8 +535,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Continue mapping on the same topology version as it was before. topSnapshot.compareAndSet(null, snapshot); - // TODO IGNITE-51. - // map(keys); + map(keys); markInitialized(); @@ -569,8 +568,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity topSnapshot.compareAndSet(null, snapshot); - // TODO IGNITE-51. - // map(keys); + map(keys); markInitialized(); } @@ -881,9 +879,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity threadId, lockVer, topVer, - // TODO IGNITE-51. - // keys, - null, + keys, read, timeout, accessTtl, @@ -1244,7 +1240,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } // Set value to detached entry. - entry.resetFromPrimary(newVal, null, dhtVer); + entry.resetFromPrimary(newVal, dhtVer); if (log.isDebugEnabled()) log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 944034c..b9602d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -46,16 +46,12 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { * Sets value to detached entry so it can be retrieved in transactional gets. * * @param val Value. - * @param valBytes Value bytes. * @param ver Version. * @throws IgniteCheckedException If value unmarshalling failed. */ - public void resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver) + public void resetFromPrimary(CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { - if (valBytes != null && val == null) - val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); - - value(val, valBytes); + value(val); this.ver = ver; } @@ -66,7 +62,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) { + @Override protected void value(@Nullable CacheObject val) { this.val = val; } @@ -79,7 +75,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime, + @Override protected void updateIndex(CacheObject val, long expireTime, GridCacheVersion ver, CacheObject old) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary nodes. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 66efe48..1338788 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -503,7 +503,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec try { if (entry.initialValue( info.value(), - null, info.version(), info.ttl(), info.expireTime(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 6ebd8d2..4e5beed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -506,7 +506,6 @@ public class GridDhtPartitionDemandPool<K, V> { if (preloadPred == null || preloadPred.apply(entry)) { if (cached.initialValue( entry.value(), - null, entry.version(), entry.ttl(), entry.expireTime(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a30f372..4ebcced 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -219,7 +219,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { nodeId, op, val, - valBytes, null, /*write-through*/false, /*retval*/false, @@ -316,7 +315,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { nodeId, op, op == TRANSFORM ? entryProcessor : val, - null, op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*retval*/false, @@ -650,7 +648,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, long timeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, @@ -659,7 +657,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable TransactionIsolation isolation, long accessTtl, IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return dht.lockAllAsync(keys, timeout, filter); + return dht.lockAllAsync(null, timeout, filter); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index a32b6a8..b8f8c6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -309,7 +309,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda // init() will register future for responses if future has remote mappings. fut.init(); - return ctx.wrapCloneMap(fut); + return fut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 2f6bebb..2d859a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -138,7 +138,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { if (isNew() || !valid(topVer)) { // Version does not change for load ops. - update(e.value(), null, e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); + update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); if (cctx.deferredDelete()) { boolean deleted = val == null; @@ -176,7 +176,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * This method should be called only when lock is owned on this entry. * * @param val Value. - * @param valBytes Value bytes. * @param ver Version. * @param dhtVer DHT version. * @param primaryNodeId Primary node ID. @@ -185,27 +184,23 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @throws IgniteCheckedException If failed. */ @SuppressWarnings( {"RedundantTypeArguments"}) - public boolean resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, - UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException { + public boolean resetFromPrimary(CacheObject val, + GridCacheVersion ver, + GridCacheVersion dhtVer, + UUID primaryNodeId) + throws GridCacheEntryRemovedException, IgniteCheckedException + { assert dhtVer != null; cctx.versions().onReceived(primaryNodeId, dhtVer); -// TODO IGNITE-51. -// if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) { -// GridCacheVersion curDhtVer = dhtVersion(); -// -// if (!F.eq(dhtVer, curDhtVer)) -// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); -// } - synchronized (this) { checkObsolete(); this.primaryNodeId = primaryNodeId; if (!F.eq(this.dhtVer, dhtVer)) { - value(val, valBytes); + value(val); this.ver = ver; this.dhtVer = dhtVer; @@ -222,14 +217,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * * @param dhtVer DHT version. * @param val Value associated with version. - * @param valBytes Value bytes. * @param expireTime Expire time. * @param ttl Time to live. * @param primaryNodeId Primary node ID. */ public void updateOrEvict(GridCacheVersion dhtVer, @Nullable CacheObject val, - @Nullable byte[] valBytes, long expireTime, long ttl, UUID primaryNodeId) @@ -248,7 +241,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { // If cannot evict, then update. if (this.dhtVer == null) { if (!markObsolete(dhtVer)) { - value(val, valBytes); + value(val); ttlAndExpireTimeExtras((int) ttl, expireTime); @@ -396,7 +389,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { // Change entry only if dht version has changed. if (!dhtVer.equals(dhtVersion())) { - update(val, valBytes, expireTime, ttl, ver); + update(val, expireTime, ttl, ver); if (cctx.deferredDelete()) { boolean deleted = val == null && valBytes == null; @@ -429,7 +422,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime, + @Override protected void updateIndex(CacheObject val, long expireTime, GridCacheVersion ver, CacheObject old) throws IgniteCheckedException { // No-op: queries are disabled for near cache. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index b880966..d89b59d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1029,7 +1029,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Lock is held at this point, so we can set the // returned value if any. - entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id()); + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id()); entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), res.pending()); @@ -1387,7 +1387,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Lock is held at this point, so we can set the // returned value if any. - entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id()); + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id()); if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 6e88c4f..e35e0fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -116,7 +116,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { - return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals)); + return tx.getAllAsync(ctx, + ctx.cacheKeysView(keys), + entry, + deserializePortable, + skipVals, + false); } }); } @@ -400,7 +405,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> lockAllAsync( - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, long timeout, IgniteTxLocalEx tx, boolean isInvalidate, @@ -411,9 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> IgnitePredicate<Cache.Entry<K, V>>[] filter ) { GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, - // TODO IGNITE-51 - // keys, - null, + keys, (GridNearTxLocal)tx, isRead, retval, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index d899562..8d6800d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -280,7 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> loadMissing( - GridCacheContext cacheCtx, + final GridCacheContext cacheCtx, boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, @@ -288,69 +288,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { boolean skipVals, final IgniteBiInClosure<KeyCacheObject, Object> c ) { - return null; -// TODO IGNITE-51. -// if (cacheCtx.isNear()) { -// return cacheCtx.nearTx().txLoadAsync(this, -// keys, -// readThrough, -// deserializePortable, -// accessPolicy(cacheCtx, keys), -// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() { -// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) { -// try { -// Map<K, V> map = f.get(); -// -// // Must loop through keys, not map entries, -// // as map entries may not have all the keys. -// for (K key : keys) -// c.apply(key, map.get(key)); -// -// return true; -// } -// catch (Exception e) { -// setRollbackOnly(); -// -// throw new GridClosureException(e); -// } -// } -// }); -// } -// else if (cacheCtx.isColocated()) { -// return cacheCtx.colocated().loadAsync(keys, -// readThrough, -// /*reload*/false, -// /*force primary*/false, -// topologyVersion(), -// CU.subjectId(this, cctx), -// resolveTaskName(), -// deserializePortable, -// accessPolicy(cacheCtx, keys), -// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() { -// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) { -// try { -// Map<K, V> map = f.get(); -// -// // Must loop through keys, not map entries, -// // as map entries may not have all the keys. -// for (K key : keys) -// c.apply(key, map.get(key)); -// -// return true; -// } -// catch (Exception e) { -// setRollbackOnly(); -// -// throw new GridClosureException(e); -// } -// } -// }); -// } -// else { -// assert cacheCtx.isLocal(); -// -// return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c); -// } + if (cacheCtx.isNear()) { + return cacheCtx.nearTx().txLoadAsync(this, + keys, + readThrough, + deserializePortable, + accessPolicy(cacheCtx, keys), + skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { + @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { + try { + Map<Object, Object> map = f.get(); + + // Must loop through keys, not map entries, + // as map entries may not have all the keys. + for (KeyCacheObject key : keys) + c.apply(key, map.get(key.value(cacheCtx, false))); + + return true; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } + } + }); + } + else if (cacheCtx.isColocated()) { + return cacheCtx.colocated().loadAsync(keys, + readThrough, + /*reload*/false, + /*force primary*/false, + topologyVersion(), + CU.subjectId(this, cctx), + resolveTaskName(), + deserializePortable, + accessPolicy(cacheCtx, keys), + skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { + @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { + try { + Map<Object, Object> map = f.get(); + + // Must loop through keys, not map entries, + // as map entries may not have all the keys. + for (KeyCacheObject key : keys) + c.apply(key, map.get(key.value(cacheCtx, false))); + + return true; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } + } + }); + } + else { + assert cacheCtx.isLocal(); + + return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 9af91c6..491a171 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -949,7 +949,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue(); - nearEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion(), + nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(), tup.get1(), m.node().id()); } else if (txEntry.cached().detached()) { @@ -957,7 +957,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue(); - detachedEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion()); + detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion()); } break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 42e8600..e4111ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -257,61 +257,61 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 23: + case 24: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 24: + case 25: if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); - case 25: + case 26: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 26: + case 27: if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); - case 27: + case 28: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 28: + case 29: if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); - case 29: + case 30: if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); - case 30: + case 31: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 31: + case 32: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 32: + case 33: if (!writer.writeLong("topVer", topVer)) return false; @@ -333,7 +333,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 23: + case 24: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -341,7 +341,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 24: + case 25: implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) @@ -349,7 +349,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 26: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -357,7 +357,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 27: lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) @@ -365,7 +365,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 28: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -373,7 +373,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 29: near = reader.readBoolean("near"); if (!reader.isLastRead()) @@ -381,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 30: retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) @@ -389,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 31: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -397,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 32: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -405,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 33: topVer = reader.readLong("topVer"); if (!reader.isLastRead()) @@ -425,7 +425,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 33; + return 34; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 273a184..78cfb73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -224,14 +224,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - GridCacheContext cctx = ctx.cacheContext(cacheId); - if (ownedVals != null && ownedValsBytes == null) { ownedValsBytes = new ArrayList<>(ownedVals.size()); for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> entry : ownedVals.entrySet()) { IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue(); + GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId()); + + entry.getKey().prepareMarshal(cctx); + CacheObject val = tup.get2(); if (val != null) @@ -245,8 +247,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse retValBytes = ctx.marshaller().marshal(retVal); if (filterFailedKeys != null) { - for (IgniteTxKey key :filterFailedKeys) + for (IgniteTxKey key :filterFailedKeys) { + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); + key.prepareMarshal(cctx); + } } } @@ -254,8 +259,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cctx = ctx.cacheContext(cacheId); - if (ownedValsBytes != null && ownedVals == null) { ownedVals = new HashMap<>(); @@ -264,6 +267,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse CacheObject val = tup.get3(); + GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId()); + + tup.get1().finishUnmarshal(cctx, ldr); + if (val != null) val.finishUnmarshal(cctx, ldr); @@ -275,8 +282,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse retVal = ctx.marshaller().unmarshal(retValBytes, ldr); if (filterFailedKeys != null) { - for (IgniteTxKey key :filterFailedKeys) + for (IgniteTxKey key :filterFailedKeys) { + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); + key.finishUnmarshal(cctx, ldr); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 7a0c5d8..4c59437 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -104,7 +104,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys, + @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys, long timeout, IgniteTxLocalEx tx, boolean isRead, @@ -121,7 +121,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { IgnitePredicate<Cache.Entry<K, V>>[] filter) { IgniteTxLocalEx tx = ctx.tm().localTx(); - return lockAllAsync(keys, timeout, tx, filter); + return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter); } /** @@ -131,7 +131,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { * @param filter Filter. * @return Future. */ - public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, long timeout, @Nullable IgniteTxLocalEx tx, IgnitePredicate<Cache.Entry<K, V>>[] filter) { @@ -141,12 +141,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter); try { - for (K key : keys) { + for (KeyCacheObject key : keys) { while (true) { GridLocalCacheEntry entry = null; try { - entry = entryExx(ctx.toCacheKeyObject(key)); + entry = entryExx(key); if (!ctx.isAll(entry, filter)) { fut.onFailed(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index b2797a4..4e769e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -107,7 +107,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> */ GridLocalLockFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, + Collection<KeyCacheObject> keys, IgniteTxLocalEx tx, GridLocalCache<K, V> cache, long timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 833f8c9..8aa6a63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1014,7 +1014,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { try { entry = entryEx(cacheKey); - GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( + GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( ver, val == null ? DELETE : op, val, @@ -1452,7 +1452,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( + GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( ver, op, writeVal, @@ -1468,12 +1468,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { taskName); if (intercept) { - if (op == UPDATE) + if (op == UPDATE) { ctx.config().getInterceptor().onAfterPut(entry.key().value(ctx, false), - writeVal.value(ctx, false)); + writeVal.value(ctx, false)); + } else - ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false), - CU.value(t.get2(), ctx, false)); + ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false), t.get2()); } } catch (GridCacheEntryRemovedException ignore) { @@ -1555,7 +1555,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys, + @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys, long timeout, IgniteTxLocalEx tx, boolean isRead, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index c26ca4b..563d38c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1193,7 +1193,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @throws IgniteCheckedException If failed to get previous value for transform. * @throws GridCacheEntryRemovedException If entry was concurrently deleted. */ - protected GridTuple3<GridCacheOperation, CacheObject, byte[]> applyTransformClosures( + protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures( IgniteTxEntry txEntry, boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException { GridCacheContext cacheCtx = txEntry.context(); @@ -1201,10 +1201,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter assert cacheCtx != null; if (isSystemInvalidate()) - return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null, null); + return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null); if (F.isEmpty(txEntry.entryProcessors())) - return F.t(txEntry.op(), txEntry.value(), null); + return F.t(txEntry.op(), txEntry.value()); else { try { boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); @@ -1246,14 +1246,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter modified |= invokeEntry.modified(); } - if (modified) { - cacheVal = cacheCtx.toCacheObject(val); -// TODO IGNITE-51 -// val = (V)cacheCtx.<V>unwrapTemporary(val); -// -// if (cacheCtx.portableEnabled()) -// val = (V)cacheCtx.marshalToPortable(val); - } + if (modified) + cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; @@ -1270,7 +1264,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } } - return F.t(op, cacheVal, null); + return F.t(op, cacheVal); } catch (GridCacheFilterFailedException e) { assert false : "Empty filter failed for innerGet: " + e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 6b1881d..66eea11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -1044,7 +1044,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException { hasWriteVal = in.readBoolean(); - val = (CacheObject)in.readObject(); + if (hasWriteVal) + val = (CacheObject)in.readObject(); op = fromOrdinal(in.readInt()); // TODO IGNITE-51.