# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/832f114e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/832f114e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/832f114e Branch: refs/heads/ignite-51 Commit: 832f114e8eb6e2cfeaccefd632de783d46e83768 Parents: a265949 Author: sboikov <sboi...@gridgain.com> Authored: Thu Feb 26 12:21:20 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Feb 26 17:06:22 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 4 +- .../communication/GridIoMessageFactory.java | 5 + .../cache/CacheInvokeDirectResult.java | 203 ++++++++++ .../internal/processors/cache/CacheObject.java | 16 + .../processors/cache/CacheObjectImpl.java | 59 +-- .../processors/cache/GridCacheAdapter.java | 368 +++++++++++++++---- .../processors/cache/GridCacheContext.java | 36 +- .../processors/cache/GridCacheMapEntry.java | 52 ++- .../processors/cache/GridCacheProjectionEx.java | 8 +- .../cache/GridCacheProjectionImpl.java | 12 +- .../processors/cache/GridCacheProxyImpl.java | 8 +- .../cache/GridCacheUpdateAtomicResult.java | 6 +- .../processors/cache/KeyCacheObjectImpl.java | 15 +- .../processors/cache/UserCacheObjectImpl.java | 53 +++ .../cache/UserKeyCacheObjectImpl.java | 46 +++ .../distributed/dht/GridDhtCacheAdapter.java | 17 +- .../cache/distributed/dht/GridDhtGetFuture.java | 111 +++--- .../dht/GridPartitionedGetFuture.java | 136 +++---- .../dht/atomic/GridDhtAtomicCache.java | 52 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 49 ++- .../atomic/GridNearAtomicUpdateResponse.java | 113 ++++-- .../dht/colocated/GridDhtColocatedCache.java | 10 +- .../distributed/near/GridNearAtomicCache.java | 8 +- .../distributed/near/GridNearGetFuture.java | 1 - .../distributed/near/GridNearGetRequest.java | 92 +++-- .../distributed/near/GridNearGetResponse.java | 21 +- .../local/atomic/GridLocalAtomicCache.java | 12 +- .../portable/GridPortableProcessor.java | 7 +- .../portable/os/GridOsPortableProcessor.java | 13 +- 29 files changed, 1092 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 7962a4b..cf86b78 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -18,6 +18,8 @@ package org.apache.ignite.codegen; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -139,7 +141,7 @@ public class MessageCodeGenerator { MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); - gen.generateAll(true); + gen.generateAndWrite(GridNearAtomicUpdateResponse.class); // gen.generateAndWrite(GridDistributedLockRequest.class); // gen.generateAndWrite(GridDistributedLockResponse.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index aad555a..8451413 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -512,6 +512,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 93: + msg = new CacheInvokeDirectResult(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java new file mode 100644 index 0000000..a67abce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import javax.cache.processor.*; + +/** + * + */ +public class CacheInvokeDirectResult implements Message { + /** */ + private KeyCacheObject key; + + /** */ + @GridToStringInclude + private CacheObject res; + + /** */ + @GridToStringInclude + @GridDirectTransient + private Exception err; + + /** */ + private byte[] errBytes; + + /** + * Required for {@link Message}. + */ + public CacheInvokeDirectResult() { + // No-op. + } + + /** + * @param key Key. + * @param res Result. + */ + public CacheInvokeDirectResult(KeyCacheObject key, CacheObject res) { + this.key = key; + this.res = res; + } + + /** + * @param key Key. + * @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}. + */ + public CacheInvokeDirectResult(KeyCacheObject key, Exception err) { + this.key = key; + this.err = err; + } + + /** + * @return Key. + */ + public KeyCacheObject key() { + return key; + } + + /** + * @return Result. + */ + public CacheObject result() { + return res; + } + + /** + * @return Error. + */ + @Nullable public Exception error() { + return err; + } + + /** {@inheritDoc} */ + public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + key.prepareMarshal(ctx); + + if (err != null) + errBytes = ctx.marshaller().marshal(err); + + if (res != null) + res.prepareMarshal(ctx); + } + + /** {@inheritDoc} */ + public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + key.finishUnmarshal(ctx, ldr); + + if (errBytes != null) + err = ctx.marshaller().unmarshal(errBytes, ldr); + + if (res != null) + res.finishUnmarshal(ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 93; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("res", res)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + res = reader.readMessage("res"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheInvokeDirectResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index cda9e86..425cf09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -37,7 +37,23 @@ public interface CacheObject extends Message { */ @Nullable public <T> T getField(String name); + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException; + /** + * @param ctx Context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException; + + /** + * @param ctx Cache context. + * + * @return Instance to store in cache. + */ + public CacheObject prepareForCache(GridCacheContext ctx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java index 9e24b43..52c0d69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java @@ -34,10 +34,10 @@ public class CacheObjectImpl implements CacheObject, Externalizable { /** */ @GridToStringInclude @GridDirectTransient - private Object val; + protected Object val; /** */ - private byte[] valBytes; + protected byte[] valBytes; /** * @@ -48,14 +48,13 @@ public class CacheObjectImpl implements CacheObject, Externalizable { /** * @param val Value. + * @param valBytes Value bytes. */ - public CacheObjectImpl(Object val) { - assert val != null; + public CacheObjectImpl(Object val, byte[] valBytes) { + assert val != null || valBytes != null; - if (val instanceof byte[]) - valBytes = (byte[])val; - else - this.val = val; + this.val = val; + this.valBytes = valBytes; } /** {@inheritDoc} */ @@ -69,31 +68,33 @@ public class CacheObjectImpl implements CacheObject, Externalizable { @Nullable @Override public <T> T value(GridCacheContext ctx) { if (val != null) return (T)val; - else - return (T)valBytes; + + assert valBytes != null; + + try { + val = ctx.marshaller().unmarshal(valBytes, U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal object.", e); + } + + return (T)val; } /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (valBytes == null) + if (!(val instanceof byte[])) valBytes = CU.marshal(ctx, val); } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - assert valBytes != null; - - boolean byteArr = val != null; - - if (byteArr) - val = null; - else - val = ctx.marshaller().unmarshal(valBytes, ldr); + assert val != null || valBytes != null; } /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert valBytes != null; + assert val != null || valBytes != null; writer.setBuffer(buf); @@ -104,16 +105,16 @@ public class CacheObjectImpl implements CacheObject, Externalizable { writer.onHeaderWritten(); } + boolean byteArr = val instanceof byte[]; + switch (writer.state()) { case 0: - if (!writer.writeByteArray("valBytes", valBytes)) + if (!writer.writeByteArray("valBytes", byteArr ? (byte[])val : valBytes)) return false; writer.incrementState(); case 1: - boolean byteArr = val == null; - if (!writer.writeBoolean("byteArr", byteArr)) return false; @@ -146,9 +147,12 @@ public class CacheObjectImpl implements CacheObject, Externalizable { if (!reader.isLastRead()) return false; - if (byteArr) + if (byteArr) { val = valBytes; + valBytes = null; + } + reader.incrementState(); } @@ -180,15 +184,22 @@ public class CacheObjectImpl implements CacheObject, Externalizable { return super.equals(obj); } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { assert false; } + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { assert false; } /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(GridCacheContext ctx) { + return this; + } + + /** {@inheritDoc} */ public String toString() { return S.toString(CacheObjectImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index bfffaae..7121ac7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1875,7 +1875,168 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** + * @param keys Keys. + * @param ret Return flag. + * @param skipVals Skip values flag. + * @param subjId Subject ID. + * @param taskName Task name. + * @return Future. + */ + public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> reloadAllAsync0( + Collection<KeyCacheObject> keys, + boolean ret, + boolean skipVals, + @Nullable UUID subjId, + String taskName) + { + final long topVer = ctx.affinity().affinityTopologyVersion(); + + if (!F.isEmpty(keys)) { + final String uid = CU.uuid(); // Get meta UUID for this thread. + + assert keys != null; + + for (KeyCacheObject key : keys) { + if (key == null) + continue; + + // Skip primary or backup entries for near cache. + if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) + continue; + + while (true) { + try { + GridCacheEntryEx entry = entryExSafe(key, topVer); + + if (entry == null) + break; + + // Get version before checking filer. + GridCacheVersion ver = entry.version(); + + // Tag entry with current version. + entry.addMeta(uid, ver); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry for reload (will retry): " + key); + } + catch (GridDhtInvalidPartitionException ignore) { + if (log.isDebugEnabled()) + log.debug("Got invalid partition for key (will skip): " + key); + + break; + } + } + } + + final Map<KeyCacheObject, CacheObject> map = + ret ? U.<KeyCacheObject, CacheObject>newHashMap(keys.size()) : null; + + final Collection<KeyCacheObject> absentKeys = F.view(keys, CU.keyHasMeta(ctx, uid)); + + final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<Object> readFut = + readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() { + /** Version for all loaded entries. */ + private GridCacheVersion nextVer = ctx.versions().next(); + + /** {@inheritDoc} */ + @Override public void apply(KeyCacheObject key, Object val) { + loadedKeys.add(key); + + GridCacheEntryEx entry = peekEx(key); + + if (entry != null) { + try { + GridCacheVersion curVer = entry.removeMeta(uid); + + // If entry passed the filter. + if (curVer != null) { + boolean wasNew = entry.isNewLocked(); + + entry.unswap(); + + CacheObject cacheVal = ctx.toCacheObject(val); + + boolean set = entry.versionedValue(cacheVal, curVer, nextVer); + + ctx.evicts().touch(entry, topVer); + + if (map != null) { + if (set || wasNew) + map.put(key, cacheVal); + else { + try { + GridTuple<CacheObject> v = entry.peek0(false, GLOBAL, null, null); + + if (v != null) + map.put(key, v.get()); + } + catch (GridCacheFilterFailedException ex) { + ex.printStackTrace(); + + assert false; + } + } + } + + if (log.isDebugEnabled()) { + log.debug("Set value loaded from store into entry [set=" + set + ", " + + "curVer=" + + curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); + } + } + else { + if (log.isDebugEnabled()) { + log.debug("Current version was not found (either entry was removed or " + + "validation was not passed: " + entry); + } + } + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) { + log.debug("Got removed entry for reload (will not store reloaded entry) " + + "[entry=" + entry + ']'); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + }); + + return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<KeyCacheObject, CacheObject>>() { + @Override public Map<KeyCacheObject, CacheObject> applyx(IgniteInternalFuture<Object> e) + throws IgniteCheckedException { + // Touch all not loaded keys. + for (KeyCacheObject key : absentKeys) { + if (!loadedKeys.contains(key)) { + GridCacheEntryEx entry = peekEx(key); + + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + + // Make sure there were no exceptions. + e.get(); + + return map; + } + }); + } + + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<KeyCacheObject, CacheObject>emptyMap()); + } + + /** * @param key Key. + * @param topVer Topology version. * @return Entry. */ @Nullable protected GridCacheEntryEx entryExSafe(KeyCacheObject key, long topVer) { @@ -2060,21 +2221,59 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final boolean forcePrimary, @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals - ) { + ) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); ctx.denyOnFlag(LOCAL); - // Entry must be passed for one key only. - assert cached == null || keys.size() == 1; - assert ctx.portableEnabled() || cached == null || F.first(keys).equals(cached.key()); - - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - if (keyCheck) validateCacheKeys(keys); + Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() { + @Override public KeyCacheObject apply(K key) { + if (key == null) + throw new NullPointerException("Null key."); + + return ctx.toCacheKeyObject(key); + } + }); + + return getAllAsync0(keys0, + readThrough, + checkTx, + subjId, + taskName, + deserializePortable, + expiry, + skipVals, + false); + } + + /** + * @param keys Keys. + * @param readThrough Read-through flag. + * @param checkTx Check local transaction flag. + * @param subjId Subject ID. + * @param taskName Task name/ + * @param deserializePortable Deserialize portable flag. + * @param expiry Expiry policy. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects + * @return + */ + public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys, + boolean readThrough, + boolean checkTx, + @Nullable final UUID subjId, + final String taskName, + final boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiry, + final boolean skipVals, + final boolean keepCacheObjects + ) { + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K1, V1>emptyMap()); + IgniteTxLocalAdapter tx = null; if (checkTx) { @@ -2094,26 +2293,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); - final Map<K, V> map = new GridLeanMap<>(keys.size()); + final Map<K1, V1> map = new GridLeanMap<>(keys.size()); Map<KeyCacheObject, GridCacheVersion> misses = null; - for (K key : keys) { - if (key == null) - throw new NullPointerException("Null key."); - - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - + for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry; - - if (cached != null) { - entry = cached; - - cached = null; - } - else - entry = entryEx(cacheKey); + GridCacheEntryEx entry = entryEx(key); try { CacheObject val = entry.innerGet(null, @@ -2135,18 +2321,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (misses == null) misses = new GridLeanMap<>(); - misses.put(cacheKey, ver); + misses.put(key, ver); } else { - K key0 = key; - V val0 = val.value(ctx); - - if (ctx.portableEnabled() && deserializePortable) { - val0 = (V)ctx.unwrapPortableIfNeeded(val0, false); - key0 = (K)ctx.unwrapPortableIfNeeded(key0, false); - } - - map.put(key0, val0); + ctx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable); if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -2181,10 +2359,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final Collection<KeyCacheObject> loaded = new HashSet<>(); - return new GridEmbeddedFuture<>( + return new GridEmbeddedFuture( ctx.kernalContext(), - ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K, V>>() { - @Override public Map<K, V> call() throws Exception { + ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() { + @Override public Map<K1, V1> call() throws Exception { ctx.store().loadAllFromStore(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() { /** New version for all new entries. */ private GridCacheVersion nextVer; @@ -2220,8 +2398,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, "entry=" + entry + ']'); // Don't put key-value pair into result map if value is null. - if (val != null) - map.put(key.<K>value(ctx), (V)val); + if (val != null) { + ctx.addResult(map, + key, + cacheVal, + skipVals, + keepCacheObjects, + deserializePortable); + } if (tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED)) @@ -2284,8 +2468,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, V>emptyMap()); } }, - new C2<Map<K, V>, Exception, Map<K, V>>() { - @Override public Map<K, V> apply(Map<K, V> loaded, Exception e) { + new C2<Map<K1, V1>, Exception, Map<K1, V1>>() { + @Override public Map<K1, V1> apply(Map<K1, V1> loaded, Exception e) { if (e == null) map.putAll(loaded); @@ -2313,13 +2497,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } else { - final GridCacheEntryEx cached0 = cached; - - return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { - return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, cached0, deserializePortable, skipVals)); - } - }); + return null; +// TODO IGNITE-51. +// return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { +// @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { +// return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, cached0, deserializePortable, skipVals)); +// } +// }); } } @@ -2447,13 +2631,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.denyOnLocalRead(); Boolean stored = syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).get().success(); } - @Override - public String toString() { + @Override public String toString() { return "putx [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -3365,7 +3547,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> removex(final K key, final V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(final K key, final V val) throws IgniteCheckedException { ctx.denyOnLocalRead(); A.notNull(key, "key", val, "val"); @@ -3373,14 +3555,23 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (keyCheck) validateCacheKey(key); - return syncOp(new SyncOp<GridCacheReturn<CacheObject>>(true) { - @Override public GridCacheReturn<CacheObject> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + return syncOp(new SyncOp<GridCacheReturn<V>>(true) { + @Override public GridCacheReturn<V> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, + GridCacheReturn ret = tx.removeAllAsync(ctx, + Collections.singletonList(key), + null, + true, ctx.vararg(F.<K, V>cacheContainsPeek(val))).get(); + + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, ctx)); + + return ret; } @Override public String toString() { @@ -3432,7 +3623,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> replacex(final K key, final V oldVal, final V newVal) + @Override public GridCacheReturn<V> replacex(final K key, final V oldVal, final V newVal) throws IgniteCheckedException { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); @@ -3442,13 +3633,20 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.denyOnLocalRead(); - return syncOp(new SyncOp<GridCacheReturn<CacheObject>>(true) { - @Override public GridCacheReturn<CacheObject> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + return syncOp(new SyncOp<GridCacheReturn<V>>(true) { + @Override public GridCacheReturn<V> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsPeekArray(oldVal)).get(); + GridCacheReturn ret = + tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsPeekArray(oldVal)).get(); + + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, ctx)); + + return ret; } @Override public String toString() { @@ -3458,7 +3656,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(final K key, final V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(final K key, final V val) { ctx.denyOnLocalRead(); A.notNull(key, "key", val, "val"); @@ -3466,8 +3664,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (keyCheck) validateCacheKey(key); - return asyncOp(new AsyncOp<GridCacheReturn<CacheObject>>(key) { - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> op(IgniteTxLocalAdapter tx) { + return asyncOp(new AsyncOp<GridCacheReturn<V>>(key) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) @@ -3477,8 +3675,24 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, + IgniteInternalFuture<GridCacheReturn<CacheObject>> fut = tx.removeAllAsync(ctx, + Collections.singletonList(key), + null, + true, ctx.vararg(F.<K, V>cacheContainsPeek(val))); + + return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) + throws IgniteCheckedException { + GridCacheReturn ret = fut.get(); + + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, ctx)); + + return ret; + } + }); } @Override public String toString() { @@ -3488,7 +3702,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(final K key, + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(final K key, final V oldVal, final V newVal) { @@ -3499,8 +3713,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.denyOnLocalRead(); - return asyncOp(new AsyncOp<GridCacheReturn<CacheObject>>(key) { - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> op(IgniteTxLocalAdapter tx) { + return asyncOp(new AsyncOp<GridCacheReturn<V>>(key) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) @@ -3510,7 +3724,25 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - return tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsPeekArray(oldVal)); + IgniteInternalFuture<GridCacheReturn<CacheObject>> fut = tx.putAllAsync(ctx, + F.t(key, newVal), + true, + null, + -1, + ctx.equalsPeekArray(oldVal)); + + return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) + throws IgniteCheckedException { + GridCacheReturn ret = fut.get(); + + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, ctx)); + + return ret; + } + }); } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 64fe93f..a17c366 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1784,9 +1784,39 @@ public class GridCacheContext<K, V> implements Externalizable { return portable().toCacheKeyObject(obj); } - // TODO IGNITE-51. - public boolean copyOnGet() { - return false; + /** + * @param map Map. + * @param key Key. + * @param val Value. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects flag. + * @param deserializePortable Deserialize portable flag. + */ + @SuppressWarnings("unchecked") + public <K1, V1> void addResult(Map<K1, V1> map, + KeyCacheObject key, + CacheObject val, + boolean skipVals, + boolean keepCacheObjects, + boolean deserializePortable) { + assert key != null; + assert val != null; + + if (!keepCacheObjects) { + Object key0 = key.value(this); + Object val0 = skipVals ? Boolean.TRUE : val.value(this); + + if (portableEnabled() && deserializePortable) { + key0 = unwrapPortableIfNeeded(key0, false); + + if (!skipVals) + val0 = unwrapPortableIfNeeded(val0, false); + } + + map.put((K1)key0, (V1)val0); + } + else + map.put((K1)key, (V1)(skipVals ? Boolean.TRUE : val)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 6485185..aa9d100 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -150,8 +150,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheMapEntry next, long ttl, int hdrId) { log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); - if (cctx.portableEnabled()) - key = (KeyCacheObject)cctx.kernalContext().portable().detachPortable(key); + key = (KeyCacheObject)cctx.kernalContext().portable().detachPortable(key, cctx); + + assert key != null; this.key = key; this.hash = hash; @@ -159,8 +160,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl)); - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); synchronized (this) { value(val, null); @@ -518,7 +518,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject val = e.value(); if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); // Set unswapped value. update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version()); @@ -840,8 +840,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (startVer.equals(ver)) { if (ret != null) { // Detach value before index update. - if (cctx.portableEnabled()) - ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret); + ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx); GridCacheVersion nextVer = nextVersion(); @@ -924,8 +923,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - if (cctx.portableEnabled()) - ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret); + ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx); // Update indexes. if (ret != null) { @@ -1059,8 +1057,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert expireTime >= 0 : expireTime; // Detach value before index update. - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -1343,8 +1340,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } // Detach value before index update. - if (cctx.portableEnabled()) - old = (CacheObject)cctx.kernalContext().portable().detachPortable(old); + old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx); if (old != null) updateIndex(old, null, expireTime, ver, null); @@ -1460,8 +1456,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Try write-through. if (op == GridCacheOperation.UPDATE) { // Detach value before index update. - if (cctx.portableEnabled()) - updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated); + updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx); if (writeThrough) // Must persist inside synchronization in non-tx mode. @@ -1587,7 +1582,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheVersionConflictContext<?, ?> conflictCtx = null; - EntryProcessorResult<?> invokeRes = null; + CacheInvokeDirectResult invokeRes = null; // System TTL/ET which may have special values. long newSysTtl; @@ -1753,8 +1748,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { readThrough = true; // Detach value before index update. - if (cctx.portableEnabled()) - oldVal = (CacheObject)cctx.kernalContext().portable().detachPortable(oldVal); + oldVal = (CacheObject)cctx.kernalContext().portable().detachPortable(oldVal, cctx); // Calculate initial TTL and expire time. long initTtl; @@ -1820,8 +1814,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; + key0 = key.value(cctx); old0 = CU.value(old0, oldVal, cctx); - + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key0, old0); try { @@ -1835,12 +1830,13 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { updated = oldVal; if (computed != null) - invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); + invokeRes = new CacheInvokeDirectResult(key, + cctx.toCacheObject(cctx.unwrapTemporary(computed))); valBytes = null; } catch (Exception e) { - invokeRes = new CacheInvokeResult<>(e); + invokeRes = new CacheInvokeDirectResult(key, e); updated = oldVal; @@ -1990,8 +1986,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Do not change size. } - if (cctx.portableEnabled()) - updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated); + updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -2906,8 +2901,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // in load methods without actually holding entry lock. long expireTime = expireTimeExtras(); - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); updateIndex(val, null, expireTime, nextVer, old); @@ -3195,8 +3189,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (isNew() || (!preload && deletedUnlocked())) { long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); if (val != null || valBytes != null) updateIndex(val, valBytes, expTime, ver, null); @@ -3245,7 +3238,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject val = unswapped.value(); if (cctx.portableEnabled()) { - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); if (cctx.offheapTiered() && !unswapped.valueIsByteArray()) unswapped.valueBytes(cctx.convertPortableBytes(unswapped.valueBytes())); @@ -3293,8 +3286,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); if (val != null) { updateIndex(val, null, expTime, newVer, old); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index 6cd73b1..fe72bef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -212,7 +212,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws NullPointerException If either key or value are {@code null}. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal); + public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal); /** * Stores given key-value pair in cache only if only if the previous value is equal to the @@ -237,7 +237,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If replace operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException; + public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException; /** * Removes given key mapping from cache if one exists and value is equal to the passed in value. @@ -258,7 +258,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException; + public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException; /** * Asynchronously removes given key mapping from cache if one exists and value is equal to the passed in value. @@ -282,7 +282,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws NullPointerException if the key or value is {@code null}. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val); + public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val); /** * @param key Key to retrieve the value for. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 168886c..3e8930b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -1164,30 +1164,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); // Check k-v predicate first. if (!isAll(key, newVal, true)) - return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<CacheObject>(false)); + return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(false)); return cache.replacexAsync(key, oldVal, newVal); } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { return replacexAsync(key, oldVal, newVal).get(); } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { return removexAsync(key, val).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { return !isAll(key, val, true) ? new GridFinishedFuture<>(cctx.kernalContext(), - new GridCacheReturn<CacheObject>(false)) : cache.removexAsync(key, val); + new GridCacheReturn<V>(false)) : cache.removexAsync(key, val); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d1d773d..bc317de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1517,7 +1517,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1529,7 +1529,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1541,7 +1541,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1553,7 +1553,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index af96e0c..45aeae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -57,7 +57,7 @@ public class GridCacheUpdateAtomicResult { private final boolean sndToDht; /** Value computed by entry processor. */ - private EntryProcessorResult<?> res; + private CacheInvokeDirectResult res; /** * Constructor. @@ -75,7 +75,7 @@ public class GridCacheUpdateAtomicResult { public GridCacheUpdateAtomicResult(boolean success, @Nullable CacheObject oldVal, @Nullable CacheObject newVal, - @Nullable EntryProcessorResult<?> res, + @Nullable CacheInvokeDirectResult res, long newTtl, long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @@ -95,7 +95,7 @@ public class GridCacheUpdateAtomicResult { /** * @return Value computed by the {@link EntryProcessor}. */ - @Nullable public EntryProcessorResult<?> computedResult() { + @Nullable public CacheInvokeDirectResult computedResult() { return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 113c6d1..9a0c971 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -34,10 +34,10 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { /** */ @GridToStringInclude @GridDirectTransient - private Object val; + protected Object val; /** */ - private byte[] valBytes; + protected byte[] valBytes; /** * @@ -48,11 +48,13 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { /** * @param val Value. + * @param valBytes Value bytes. */ - public KeyCacheObjectImpl(Object val) { + public KeyCacheObjectImpl(Object val, byte[] valBytes) { assert val != null; this.val = val; + this.valBytes = valBytes; } /** {@inheritDoc} */ @@ -73,6 +75,11 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { } /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(GridCacheContext ctx) { + return this; + } + + /** {@inheritDoc} */ @Override public int hashCode() { assert val != null; @@ -156,10 +163,12 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { return val.equals(other.val); } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { assert false; } + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { assert false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java new file mode 100644 index 0000000..e5bbe04 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Cache object wrapping object provided by user. Need to be copied before stored in cache. + */ +public class UserCacheObjectImpl extends CacheObjectImpl { + /** + * @param val Value. + */ + public UserCacheObjectImpl(Object val) { + super(val, null); + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(GridCacheContext ctx) { + if (val instanceof byte[]) { + byte[] byteArr = (byte[])val; + + return new CacheObjectImpl(Arrays.copyOf(byteArr, byteArr.length), null); + } + else { + try { + byte[] valBytes = ctx.marshaller().marshal(val); + + return new CacheObjectImpl(null, valBytes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java new file mode 100644 index 0000000..a0216e0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Cache object wrapping key provided by user. Need to be copied before stored in cache. + */ +public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { + /** + * @param val Key value. + */ + public UserKeyCacheObjectImpl(Object val) { + super(val, null); + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(GridCacheContext ctx) { + try { + if (valBytes == null) + valBytes = ctx.marshaller().marshal(val); + + return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, U.gridClassLoader()), valBytes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 568566e..d5d5a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -547,28 +547,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param readThrough Read through flag. * @param subjId Subject ID. * @param taskName Task name. - * @param deserializePortable Deserialize portable flag. * @param expiry Expiry policy. + * @param skipVals Skip values flag. * @return Get future. */ - IgniteInternalFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, + IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync( + Collection<KeyCacheObject> keys, boolean readThrough, @Nullable UUID subjId, String taskName, - boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals ) { - return getAllAsync(keys, + return getAllAsync0(keys, readThrough, - null, /*don't check local tx. */false, subjId, taskName, - deserializePortable, false, expiry, - skipVals); + skipVals, + /*keep cache objects*/true); } /** @@ -580,7 +579,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Topology version. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param deserializePortable Deserialize portable flag. * @param expiry Expiry policy. * @return DHT future. */ @@ -592,7 +590,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long topVer, @Nullable UUID subjId, int taskNameHash, - boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, @@ -605,7 +602,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap topVer, subjId, taskNameHash, - deserializePortable, expiry, skipVals); @@ -634,7 +630,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.topologyVersion(), req.subjectId(), req.taskNameHash(), - false, expiryPlc, req.skipValues()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index fc99639..516b2bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -90,9 +90,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** Task name. */ private int taskNameHash; - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - /** Expiry policy. */ private IgniteCacheExpiryPolicy expiryPlc; @@ -117,7 +114,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param topVer Topology version. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. */ @@ -132,7 +128,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col long topVer, @Nullable UUID subjId, int taskNameHash, - boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals ) { @@ -150,7 +145,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col this.tx = tx; this.topVer = topVer; this.subjId = subjId; - this.deserializePortable = deserializePortable; this.taskNameHash = taskNameHash; this.expiryPlc = expiryPlc; this.skipVals = skipVals; @@ -173,13 +167,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col markInitialized(); } - /** - * @return Keys. - */ - Collection<KeyCacheObject> keys() { - return keys.keySet(); - } - /** {@inheritDoc} */ @Override public Collection<Integer> invalidPartitions() { return retries; @@ -351,28 +338,26 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (txFut != null) txFut.markInitialized(); - IgniteInternalFuture<Map<K, V>> fut; + IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut = null; if (txFut == null || txFut.isDone()) { - fut = null; + if (reload && cctx.readThrough() && cctx.store().configured()) { + fut = cache().reloadAllAsync0(keys.keySet(), + true, + skipVals, + subjId, + taskName); + } + else { + if (tx == null) { + fut = cache().getDhtAllAsync(keys.keySet(), + readThrough, + subjId, + taskName, + expiryPlc, + skipVals); + } // TODO IGNITE-51. -// if (reload && cctx.readThrough() && cctx.store().configured()) { -// fut = cache().reloadAllAsync(keys.keySet(), -// true, -// skipVals, -// subjId, -// taskName); -// } -// else { -// if (tx == null) { -// fut = cache().getDhtAllAsync(keys.keySet(), -// readThrough, -// subjId, -// taskName, -// deserializePortable, -// expiryPlc, -// skipVals); -// } // else { // fut = tx.getAllAsync(cctx, // keys.keySet(), @@ -380,7 +365,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // deserializePortable, // skipVals); // } -// } + } } else { // If we are here, then there were active transactions for some entries @@ -388,45 +373,44 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteInternalFuture<Map<K, V>>>() { - @Override public IgniteInternalFuture<Map<K, V>> apply(Boolean b, Exception e) { - return null; + new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() { + @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) { + if (e != null) + throw new GridClosureException(e); + + if (reload && cctx.readThrough() && cctx.store().configured()) { + return cache().reloadAllAsync0(keys.keySet(), + true, + skipVals, + subjId, + taskName); + } + else { + if (tx == null) { + return cache().getDhtAllAsync(keys.keySet(), + readThrough, + subjId, + taskName, + expiryPlc, skipVals); + } + else { + return null; // TODO IGNITE-51. -// if (e != null) -// throw new GridClosureException(e); -// -// if (reload && cctx.readThrough() && cctx.store().configured()) { -// return cache().reloadAllAsync(keys.keySet(), -// true, -// skipVals, -// subjId, -// taskName); -// } -// else { -// if (tx == null) { -// return cache().getDhtAllAsync(keys.keySet(), -// readThrough, -// subjId, -// taskName, -// deserializePortable, -// expiryPlc, skipVals); -// } -// else { // return tx.getAllAsync(cctx, // keys.keySet(), // null, -// deserializePortable, +// false, // skipVals); -// } -// } + } + } } }, cctx.kernalContext()); } return new GridEmbeddedFuture<>(cctx.kernalContext(), fut, - new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Map<K, V> map, Exception e) { + new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) { if (e != null) { onDone(e); @@ -436,13 +420,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) { GridCacheEntryInfo info = it.next(); - // TODO IGNITE-51. - V v = map.get(info.key().value(cctx)); + CacheObject v = map.get(info.key()); if (v == null) it.remove(); else - info.value(cctx.toCacheObject(v)); + info.value(v); } return infos;