# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d7dac569 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d7dac569 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d7dac569 Branch: refs/heads/ignite-410 Commit: d7dac569bf095837294168e66d0387971eeaed71 Parents: e59ea49 Author: sboikov <sboi...@gridgain.com> Authored: Tue Mar 10 10:53:00 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 10 12:47:16 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 + .../processors/cache/CacheObjectAdapter.java | 72 ++++++--- .../cache/CacheObjectByteArrayImpl.java | 153 +++++++++++++++++++ .../processors/cache/CacheObjectImpl.java | 98 +----------- .../processors/cache/CacheProjection.java | 15 -- .../processors/cache/GridCacheAdapter.java | 70 +-------- .../processors/cache/GridCacheContext.java | 11 +- .../processors/cache/GridCacheEntryEx.java | 11 -- .../processors/cache/GridCacheEntryInfo.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 43 +----- .../processors/cache/GridCacheMvccManager.java | 2 +- .../cache/GridCacheProjectionImpl.java | 10 -- .../processors/cache/GridCacheProxyImpl.java | 25 --- .../processors/cache/KeyCacheObjectImpl.java | 58 +------ .../processors/cache/UserCacheObjectImpl.java | 81 ---------- .../cache/UserKeyCacheObjectImpl.java | 75 --------- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 6 - .../near/GridNearTxPrepareFuture.java | 14 +- .../query/GridCacheDistributedQueryManager.java | 6 - .../cache/query/GridCacheLocalQueryFuture.java | 3 - .../cache/query/GridCacheQueriesImpl.java | 17 --- .../cache/query/GridCacheQueryAdapter.java | 15 -- .../cache/query/GridCacheQueryInfo.java | 13 -- .../cache/query/GridCacheQueryManager.java | 94 ++---------- .../cache/query/GridCacheQueryRequest.java | 53 ++----- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../dataload/IgniteDataLoaderImpl.java | 16 +- .../datastructures/GridCacheSetImpl.java | 4 +- .../portable/IgniteCacheObjectProcessor.java | 42 ++--- .../IgniteCacheObjectProcessorImpl.java | 153 ++++++++++++++++--- .../handlers/cache/GridCacheCommandHandler.java | 46 +++--- .../handlers/task/GridTaskCommandHandler.java | 3 +- .../protocols/tcp/GridTcpRestNioListener.java | 3 - .../rest/protocols/tcp/GridTcpRestProtocol.java | 9 -- .../rest/request/GridRestRequest.java | 17 --- .../rest/request/GridRestTaskRequest.java | 17 --- .../visor/cache/VisorCacheCompactTask.java | 27 +--- .../GridCacheOnCopyFlagAbstractSelfTest.java | 98 ++++++++++-- .../processors/cache/GridCacheTestEntryEx.java | 8 - ...chePartitionedReloadAllAbstractSelfTest.java | 3 +- .../cache/GridCacheAbstractQuerySelfTest.java | 45 +----- 42 files changed, 521 insertions(+), 926 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index d702540..dca9fa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -574,6 +574,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 105: + msg = new CacheObjectByteArrayImpl(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 4094489..923fd6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; import java.io.*; +import java.nio.*; /** * @@ -49,44 +51,74 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable return ctx.copyOnGet() && val != null && !ctx.processor().immutable(val); } - /** - * @return {@code True} if value is byte array. - */ - protected abstract boolean byteArray(); - /** {@inheritDoc} */ @Override public byte type() { - return byteArray() ? TYPE_BYTE_ARR : TYPE_REGULAR; + return TYPE_REGULAR; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - byte[] valBytes = byteArray() ? (byte[])val : this.valBytes; - assert valBytes != null; - out.writeBoolean(byteArray()); - U.writeByteArray(out, valBytes); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean byteArr = in.readBoolean(); + valBytes = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + valBytes = reader.readByteArray("valBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); - byte[] valBytes = U.readByteArray(in); + } - if (byteArr) - val = valBytes; - else - this.valBytes = valBytes; + return true; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("valBytes", valBytes)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; } /** {@inheritDoc} */ public String toString() { - if (byteArray()) - return getClass().getSimpleName() + " [val=<byte array>, len=" + ((byte[])val).length + ']'; - else - return getClass().getSimpleName() + " [val=" + val + ", hasValBytes=" + (valBytes != null) + ']'; + return getClass().getSimpleName() + " [val=" + val + ", hasValBytes=" + (valBytes != null) + ']'; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java new file mode 100644 index 0000000..7f92483 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Cache object over byte array. + */ +public class CacheObjectByteArrayImpl implements CacheObject, Externalizable { + /** */ + protected byte[] val; + + /** + * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + */ + public CacheObjectByteArrayImpl() { + // No-op. + } + + /** + * @param val Value. + */ + public CacheObjectByteArrayImpl(byte[] val) { + assert val != null; + + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + if (cpy) + return (T)Arrays.copyOf(val, val.length); + + return (T)val; + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + return val; + } + + /** {@inheritDoc} */ + @Override public byte type() { + return CacheObjectAdapter.TYPE_BYTE_ARR; + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + return this; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("val", val)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + val = reader.readByteArray("val"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + val = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, val); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 105; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + public String toString() { + return "CacheObjectByteArrayImpl [arrLen" + (val != null ? val.length : 0) + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java index 005974d..234845a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java @@ -18,17 +18,11 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; -import java.nio.*; -import java.util.*; - /** * */ -@IgniteCodeGeneratingFail // Need to handle 'byteArray' state during write/read. public class CacheObjectImpl extends CacheObjectAdapter { /** * @@ -55,13 +49,8 @@ public class CacheObjectImpl extends CacheObjectAdapter { try { if (cpy) { - byte[] bytes = valueBytes(ctx); - - if (byteArray()) - return (T)Arrays.copyOf(bytes, bytes.length); - else - return (T)ctx.processor().unmarshal(ctx, valBytes, - val == null ? ctx.kernalContext().config().getClassLoader() : val.getClass().getClassLoader()); + return (T)ctx.processor().unmarshal(ctx, valBytes, + val == null ? ctx.kernalContext().config().getClassLoader() : val.getClass().getClassLoader()); } if (val != null) @@ -79,15 +68,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { } /** {@inheritDoc} */ - @Override public boolean byteArray() { - return val instanceof byte[]; - } - - /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { - if (byteArray()) - return (byte[])val; - if (valBytes == null) valBytes = ctx.processor().marshal(ctx, val); @@ -98,7 +79,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { assert val != null || valBytes != null; - if (valBytes == null && !byteArray()) + if (valBytes == null) valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val); } @@ -111,84 +92,11 @@ public class CacheObjectImpl extends CacheObjectAdapter { } /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert val != null || valBytes != null; - - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - boolean byteArr = byteArray(); - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("valBytes", byteArr ? (byte[])val : valBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean("byteArr", byteArr)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - valBytes = reader.readByteArray("valBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - boolean byteArr = reader.readBoolean("byteArr"); - - if (!reader.isLastRead()) - return false; - - if (byteArr) { - val = valBytes; - - valBytes = null; - } - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ @Override public byte directType() { return 89; } /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ @Override public int hashCode() { assert false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index d4a13ac..f53fcc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -1290,21 +1290,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public void clear(long timeout) throws IgniteCheckedException; /** - * Clears serialized value bytes from entry (if any) leaving only object representation. - * - * @param key Key to compact. - * @throws IgniteCheckedException If failed to compact. - * @return {@code true} if entry was deleted from cache (i.e. was expired). - */ - public boolean compact(K key) throws IgniteCheckedException; - - /** - * Clears serialized value bytes from cache entries (if any) leaving only object representation. - * @throws IgniteCheckedException If failed to compact. - */ - public void compactAll() throws IgniteCheckedException; - - /** * Removes given key mapping from cache. If cache previously contained value for the given key, * then this value is returned. In case of {@link CacheMode#PARTITIONED} or {@link CacheMode#REPLICATED} * caches, the value will be loaded from the primary node, which in its turn may load the value http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c5707ae..24e7ced 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1474,16 +1474,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(); } - /** {@inheritDoc} */ - @Override public boolean compact(K key) throws IgniteCheckedException { - return compact(key, (CacheEntryPredicate[])null); - } - - /** {@inheritDoc} */ - @Override public void compactAll() throws IgniteCheckedException { - compactAll(keySet()); - } - /** * @param entry Removes entry from cache if currently mapped value is the same as passed. */ @@ -1987,7 +1977,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final boolean keepCacheObjects ) { if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K1, V1>emptyMap()); + return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); IgniteTxLocalAdapter tx = null; @@ -4127,13 +4117,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, .execute(); return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { - @Override - protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { + @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } - @Override - protected void remove(Cache.Entry<K, V> item) { + @Override protected void remove(Cache.Entry<K, V> item) { ctx.gate().enter(); try { @@ -4750,40 +4738,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param key Key. * @param filter Filters to evaluate. - * @return {@code True} if compacted. - * @throws IgniteCheckedException If failed. - */ - public boolean compact(K key, @Nullable CacheEntryPredicate... filter) - throws IgniteCheckedException { - ctx.denyOnFlag(READ); - - A.notNull(key, "key"); - - if (keyCheck) - validateCacheKey(key); - - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - - GridCacheEntryEx entry = peekEx(cacheKey); - - try { - if (entry != null && entry.compact(filter)) { - removeIfObsolete(cacheKey); - - return true; - } - } - catch (GridCacheEntryRemovedException ignored) { - if (log().isDebugEnabled()) - log().debug("Got removed entry in invalidate(...): " + key); - } - - return false; - } - - /** - * @param key Key. - * @param filter Filters to evaluate. * @return {@code True} if evicted. */ public boolean evict(K key, @Nullable CacheEntryPredicate... filter) { @@ -4900,21 +4854,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** - * @param keys Keys. - * @param filter Filters to evaluate. - * @throws IgniteCheckedException If failed. - */ - public void compactAll(@Nullable Iterable<K> keys, - @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { - ctx.denyOnFlag(READ); - - if (keys != null) { - for (K key : keys) - compact(key, filter); - } - } - - /** * @param key Key. * @param deserializePortable Deserialize portable flag. * @return Cached value. @@ -4946,8 +4885,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return getAllAsync(Collections.singletonList(key), deserializePortable).chain( new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override - public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { Map<K, V> map = e.get(); assert map.isEmpty() || map.size() == 1 : map.size(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 87f34e1..c1f3b9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1753,7 +1753,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Cache object. */ @Nullable public CacheObject toCacheObject(@Nullable Object obj) { - return cacheObjects().toCacheObject(cacheObjCtx, obj); + return cacheObjects().toCacheObject(cacheObjCtx, obj, true); } /** @@ -1761,17 +1761,18 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Cache key object. */ public KeyCacheObject toCacheKeyObject(Object obj) { - return cacheObjects().toCacheKeyObject(cacheObjCtx, obj); + return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true); } /** * @param bytes Bytes. * @return Cache key object. + * @throws IgniteCheckedException If failed. */ public KeyCacheObject toCacheKeyObject(byte[] bytes) throws IgniteCheckedException { Object obj = ctx.cacheObjects().unmarshal(cacheObjCtx, bytes, deploy().localLoader()); - return cacheObjects().toCacheKeyObject(cacheObjCtx, obj); + return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, false); } /** @@ -1790,7 +1791,9 @@ public class GridCacheContext<K, V> implements Externalizable { if (ldr == null) return null; - return ctx.cacheObjects().toCacheObject(cacheObjCtx, ctx.cacheObjects().unmarshal(cacheObjCtx, bytes, ldr)); + return ctx.cacheObjects().toCacheObject(cacheObjCtx, + ctx.cacheObjects().unmarshal(cacheObjCtx, bytes, ldr), + false); } return ctx.cacheObjects().toCacheObject(cacheObjCtx, type, bytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index d445a12..5fbd5d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -215,17 +215,6 @@ public interface GridCacheEntryEx { throws GridCacheEntryRemovedException, IgniteCheckedException; /** - * Optimizes the size of this entry. - * - * @param filter Optional filter that entry should pass before invalidation. - * @throws GridCacheEntryRemovedException If entry was removed. - * @throws IgniteCheckedException If operation failed. - * @return {@code true} if entry was not being used and could be removed. - */ - public boolean compact(@Nullable CacheEntryPredicate[] filter) - throws GridCacheEntryRemovedException, IgniteCheckedException; - - /** * @param swap Swap flag. * @param obsoleteVer Version for eviction. * @param filter Optional filter. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index 426465b..4845635 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -400,7 +400,7 @@ public class GridCacheEntryInfo implements Message { Object key0 = ctx.cacheObjects().unmarshal(cacheObjCtx, keyBytes, clsLdr); - key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, key0); + key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, key0, false); } else key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5921d0b..565d8ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2604,46 +2604,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - /** {@inheritDoc} */ - @Override public boolean compact(@Nullable CacheEntryPredicate[] filter) - throws GridCacheEntryRemovedException, IgniteCheckedException { - // For optimistic checking. - GridCacheVersion startVer; - - synchronized (this) { - checkObsolete(); - - startVer = ver; - } - - if (!cctx.isAll(this, filter)) - return false; - - synchronized (this) { - checkObsolete(); - - if (deletedUnlocked()) - return false; // Cannot compact soft-deleted entries. - - if (startVer.equals(ver)) { - if (hasValueUnlocked() && !checkExpired()) { -// TODO IGNITE-51. -// if (!isOffHeapValuesOnly()) { -// if (val != null) -// valBytes = null; -// } - - return false; - } - else - return clear(nextVersion(), false, filter); - } - } - - // If version has changed do it again. - return compact(filter); - } - /** * * @param val New value. @@ -3188,7 +3148,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (valPtr != 0) { CacheObject val0 = cctx.fromOffheap(valPtr, tmp); - val0.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + if (!tmp && cctx.kernalContext().config().isPeerClassLoadingEnabled()) + val0.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); return val0; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c717d17..8fd432e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -728,7 +728,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * Reset MVCC context. */ public void contextReset() { - pending.set(new LinkedList<GridCacheMvccCandidate<K>>()); + pending.set(new LinkedList<GridCacheMvccCandidate>()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index da7f7e4..73bcc64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -895,16 +895,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public boolean compact(K key) throws IgniteCheckedException { - return cache.compact(key, filter); - } - - /** {@inheritDoc} */ - @Override public void compactAll() throws IgniteCheckedException { - cache.compactAll(keySet()); - } - - /** {@inheritDoc} */ @Override public V remove(K key, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return removeAsync(key, filter).get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index b995b8f..4a1f83c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1288,31 +1288,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public boolean compact(K key) - throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.compact(key); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public void compactAll() throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.compactAll(); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public V remove(K key, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 1cf6d00..58ad33b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -43,7 +43,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb */ public KeyCacheObjectImpl(Object val, byte[] valBytes) { assert val != null; - assert valBytes != null || this instanceof UserKeyCacheObjectImpl : this; this.val = val; this.valBytes = valBytes; @@ -59,13 +58,9 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ - @Override public boolean byteArray() { - return false; - } - - /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { - assert valBytes != null : this; + if (valBytes == null) + valBytes = ctx.processor().marshal(ctx, val); return valBytes; } @@ -109,60 +104,11 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("valBytes", valBytes)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - valBytes = reader.readByteArray("valBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ @Override public byte directType() { return DIRECT_TYPE; } /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } - - /** {@inheritDoc} */ @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java deleted file mode 100644 index ad68bb4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Cache object wrapping object provided by user. Need to be copied before stored in cache. - */ -public class UserCacheObjectImpl extends CacheObjectImpl { - /** - * @param val Value. - */ - public UserCacheObjectImpl(Object val) { - super(val, null); - } - - /** - * - */ - public UserCacheObjectImpl() { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { - return super.value(ctx, false); - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - if (needCopy(ctx)) { - if (val instanceof byte[]) { - byte[] byteArr = (byte[])val; - - return new CacheObjectImpl(Arrays.copyOf(byteArr, byteArr.length), null); - } - else { - try { - if (valBytes == null) - valBytes = ctx.processor().marshal(ctx, val); - - if (ctx.unmarshalValues()) { - ClassLoader ldr = ctx.p2pEnabled() ? - IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); - - Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); - - return new CacheObjectImpl(val, valBytes); - } - - return new CacheObjectImpl(null, valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); - } - } - } - else - return new CacheObjectImpl(val, valBytes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java deleted file mode 100644 index e9fb75b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** - * Cache object wrapping key provided by user. Need to be copied before stored in cache. - */ -public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { - /** - * @param val Key value. - * @param bytes Bytes. - */ - public UserKeyCacheObjectImpl(Object val, byte[] bytes) { - super(val, bytes); - } - - /** - * - */ - public UserKeyCacheObjectImpl() { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { - return super.value(ctx, false); - } - - /** {@inheritDoc} */ - @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { - if (valBytes == null) - valBytes = ctx.processor().marshal(ctx, val); - - return valBytes; - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - try { - if (valBytes == null) - valBytes = ctx.processor().marshal(ctx, val); - - if (needCopy(ctx)) { - Object val = ctx.processor().unmarshal(ctx, - valBytes, - this.val.getClass().getClassLoader()); - - return new KeyCacheObjectImpl(val, valBytes); - } - - return new KeyCacheObjectImpl(val, valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2ec1908..cf0ee34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -279,7 +279,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); if (keyCheck) validateCacheKeys(keys); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index e33b6a2..1687cb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -431,12 +431,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public boolean compact(K key, - @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { - return super.compact(key, filter) | dht().compact(key, filter); - } - - /** {@inheritDoc} */ @Override public Cache.Entry<K, V> entry(K key) { // We don't try wrap entry from near or dht cache. // Created object will be wrapped once some method is called. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 778570a..f36b4ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -354,7 +355,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut topFut.listen(new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> t) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { + @Override + public void run() { prepare(); } }); @@ -379,24 +381,24 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut if (tx.activeCacheIds().isEmpty()) return cctx.exchange().lastTopologyFuture(); - GridCacheContext<K, V> nonLocalCtx = null; + GridCacheContext<K, V> nonLocCtx = null; for (int cacheId : tx.activeCacheIds()) { GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); if (!cacheCtx.isLocal()) { - nonLocalCtx = cacheCtx; + nonLocCtx = cacheCtx; break; } } - if (nonLocalCtx == null) + if (nonLocCtx == null) return cctx.exchange().lastTopologyFuture(); - nonLocalCtx.topology().readLock(); + nonLocCtx.topology().readLock(); - return nonLocalCtx.topology().topologyVersionFuture(); + return nonLocCtx.topology().topologyVersionFuture(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 05b32e2..8d5284a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -196,8 +196,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage * @return Query info. */ @Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) { - CacheEntryPredicate prjPred = req.projectionFilter(); - IgniteReducer<Object, Object> rdc = req.reducer(); IgniteClosure<Object, Object> trans = req.transformer(); @@ -209,7 +207,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage GridCacheQueryAdapter<?> qry = new GridCacheQueryAdapter<>( cctx, - prjPred, req.type(), log, req.pageSize(), @@ -229,7 +226,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage return new GridCacheQueryInfo( false, - prjPred, trans, rdc, qry, @@ -509,7 +505,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), clsName, qry.query().scanFilter(), - qry.query().projectionFilter(), qry.reducer(), qry.transform(), qry.query().pageSize(), @@ -618,7 +613,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), null, null, - qry.query().projectionFilter(), qry.reducer(), qry.transform(), qry.query().pageSize(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java index caf34cb..9026b16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java @@ -101,8 +101,6 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException { GridCacheQueryBean qry = query(); - CacheEntryPredicate prjPred = qry.query().projectionFilter(); - Marshaller marsh = cctx.marshaller(); IgniteReducer<Object, Object> rdc = qry.reducer() != null ? @@ -113,7 +111,6 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap return new GridCacheQueryInfo( true, - prjPred, trans, rdc, qry.query(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java index 2e9f88f..41faadd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java @@ -26,7 +26,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.spi.indexing.*; import org.jetbrains.annotations.*; -import javax.cache.*; import java.io.*; import java.util.*; @@ -70,7 +69,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, SQL, - filter(), ctx.kernalContext().query().typeName(U.box(cls)), clause, null, @@ -85,7 +83,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, SQL, - filter(), clsName, clause, null, @@ -99,7 +96,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, SQL_FIELDS, - filter(), null, qry, null, @@ -114,7 +110,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, TEXT, - filter(), ctx.kernalContext().query().typeName(U.box(cls)), search, null, @@ -129,7 +124,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, TEXT, - filter(), clsName, search, null, @@ -142,7 +136,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext @Override public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter) { return new GridCacheQueryAdapter<>(ctx, SCAN, - filter(), null, null, (IgniteBiPredicate<Object, Object>)filter, @@ -158,7 +151,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext public <R> CacheQuery<R> createSpiQuery() { return new GridCacheQueryAdapter<>(ctx, SPI, - filter(), null, null, null, @@ -221,7 +213,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext return new GridCacheQueryAdapter<>(ctx, SQL_FIELDS, - filter(), null, qry, null, @@ -229,14 +220,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext prj != null && prj.isKeepPortable()); } - /** - * @return Optional projection filter. - */ - @SuppressWarnings("unchecked") - @Nullable private CacheEntryPredicate filter() { - return prj == null ? null : prj.predicate(); - } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 57d3a11..fea670f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -43,9 +43,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private final GridCacheContext<?, ?> cctx; /** */ - private final CacheEntryPredicate prjPred; - - /** */ private final GridCacheQueryType type; /** */ @@ -101,11 +98,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param filter Scan filter. * @param incMeta Include metadata flag. * @param keepPortable Keep portable flag. - * @param prjPred Cache projection filter. */ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx, GridCacheQueryType type, - @Nullable CacheEntryPredicate prjPred, @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<Object, Object> filter, @@ -118,7 +113,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.type = type; this.clsName = clsName; this.clause = clause; - this.prjPred = prjPred; this.filter = filter; this.incMeta = incMeta; this.keepPortable = keepPortable; @@ -148,7 +142,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param taskHash Task hash. */ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx, - CacheEntryPredicate prjPred, GridCacheQueryType type, IgniteLogger log, int pageSize, @@ -165,7 +158,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { UUID subjId, int taskHash) { this.cctx = cctx; - this.prjPred = prjPred; this.type = type; this.log = log; this.pageSize = pageSize; @@ -184,13 +176,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** - * @return cache projection filter. - */ - @Nullable public CacheEntryPredicate projectionFilter() { - return prjPred; - } - - /** * @return Type. */ public GridCacheQueryType type() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java index 865d431..61a5dc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java @@ -33,9 +33,6 @@ class GridCacheQueryInfo { private boolean loc; /** */ - private CacheEntryPredicate prjPred; - - /** */ private IgniteClosure<Object, Object> trans; /** */ @@ -64,7 +61,6 @@ class GridCacheQueryInfo { /** * @param loc {@code true} if local query. - * @param prjPred Projection predicate. * @param trans Transforming closure. * @param rdc Reducer. * @param qry Query base. @@ -77,7 +73,6 @@ class GridCacheQueryInfo { */ GridCacheQueryInfo( boolean loc, - CacheEntryPredicate prjPred, IgniteClosure<Object, Object> trans, IgniteReducer<Object, Object> rdc, GridCacheQueryAdapter<?> qry, @@ -89,7 +84,6 @@ class GridCacheQueryInfo { Object[] args ) { this.loc = loc; - this.prjPred = prjPred; this.trans = trans; this.rdc = rdc; this.qry = qry; @@ -123,13 +117,6 @@ class GridCacheQueryInfo { } /** - * @return Projection predicate. - */ - CacheEntryPredicate projectionPredicate() { - return prjPred; - } - - /** * @return Transformer. */ IgniteClosure<?, Object> transformer() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index d23535a..27bbea7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -734,9 +734,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @SuppressWarnings({"unchecked"}) private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry) throws IgniteCheckedException { - CacheEntryPredicate filter = qry.projectionFilter(); - - CacheProjection<K, V> prj0 = filter != null ? cctx.cache().projection(filter) : cctx.cache(); + CacheProjection<K, V> prj0 = cctx.cache(); if (qry.keepPortable()) prj0 = prj0.keepPortable(); @@ -891,13 +889,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte */ private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry) throws IgniteCheckedException { - CacheEntryPredicate prjPred = qry.projectionFilter(); - IgniteBiPredicate<K, V> filter = qry.scanFilter(); Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(); - return scanIterator(it, prjPred, filter, qry.keepPortable()); + return scanIterator(it, filter, qry.keepPortable()); } /** @@ -905,32 +901,28 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Offheap iterator. */ private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry) { - CacheEntryPredicate prjPred = qry.projectionFilter(); - IgniteBiPredicate<K, V> filter = qry.scanFilter(); - if (cctx.offheapTiered() && (prjPred != null || filter != null)) { - OffheapIteratorClosure c = new OffheapIteratorClosure(prjPred, filter, qry.keepPortable()); + if (cctx.offheapTiered() && filter != null) { + OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); return cctx.swap().rawOffHeapIterator(c); } else { Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(); - return scanIterator(it, prjPred, filter, qry.keepPortable()); + return scanIterator(it, filter, qry.keepPortable()); } } /** * @param it Lazy swap or offheap iterator. - * @param prjPred Projection predicate. * @param filter Scan filter. * @param keepPortable Keep portable flag. * @return Iterator. */ private GridIteratorAdapter<IgniteBiTuple<K, V>> scanIterator( @Nullable final Iterator<Map.Entry<byte[], byte[]>> it, - @Nullable final CacheEntryPredicate prjPred, @Nullable final IgniteBiPredicate<K, V> filter, final boolean keepPortable) { if (it == null) @@ -968,14 +960,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte while (it.hasNext()) { final LazySwapEntry e = new LazySwapEntry(it.next()); - if (prjPred != null) { - Cache.Entry<K, V> Entry = new GridCacheScanSwapEntry(e); - -// TODO IGNITE-51. -// if (!prjPred.apply((Cache.Entry<Object, Object>)Entry)) -// continue; - } - if (filter != null) { K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); @@ -1036,11 +1020,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - CacheEntryPredicate prjFilter = qryInfo.projectionPredicate(); IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer(); IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer(); - injectResources(prjFilter); injectResources(trans); injectResources(rdc); @@ -1213,11 +1195,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - CacheEntryPredicate prjFilter = qryInfo.projectionPredicate(); IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); - injectResources(prjFilter); injectResources(trans); injectResources(rdc); @@ -1804,42 +1784,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * Gets projection filter for query. - * - * @param qry Query. - * @return Filter. - */ - @SuppressWarnings("unchecked") - @Nullable private IndexingQueryFilter projectionFilter(GridCacheQueryAdapter<?> qry) { - assert qry != null; - - final CacheEntryPredicate prjFilter = qry.projectionFilter(); - - if (prjFilter == null || F.isAlwaysTrue(prjFilter)) - return null; - - return new IndexingQueryFilter() { - @Nullable @Override public IgniteBiPredicate<K, V> forSpace(String spaceName) { - if (!F.eq(space, spaceName)) - return null; - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - try { - GridCacheEntryEx entry = context().cache().peekEx(cctx.toCacheKeyObject(k)); - - return entry != null && prjFilter.apply(entry); - } - catch (GridDhtInvalidPartitionException ignore) { - return false; - } - } - }; - } - }; - } - - /** * @param <K> Key type. * @param <V> Value type. * @return Predicate. @@ -1873,7 +1817,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Filter. */ private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) { - return and(backupsFilter(qry.includeBackups()), projectionFilter(qry)); + return backupsFilter(qry.includeBackups()); } /** @@ -2486,26 +2430,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private static final long serialVersionUID = 7410163202728985912L; /** */ - private CacheEntryPredicate prjPred; - - /** */ private IgniteBiPredicate<K, V> filter; /** */ private boolean keepPortable; /** - * @param prjPred Projection filter. * @param filter Filter. * @param keepPortable Keep portable flag. */ private OffheapIteratorClosure( - @Nullable CacheEntryPredicate prjPred, @Nullable IgniteBiPredicate<K, V> filter, boolean keepPortable) { - assert prjPred != null || filter != null; + assert filter != null; - this.prjPred = prjPred; this.filter = filter; this.keepPortable = keepPortable; } @@ -2516,21 +2454,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throws IgniteCheckedException { LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr); - if (prjPred != null) { - Cache.Entry<K, V> entry = new GridCacheScanSwapEntry(e); + K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); + V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); -// TODO IGNITE-51. -// if (!prjPred.apply((Cache.Entry<Object, Object>)entry)) -// return null; - } - - if (filter != null) { - K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); - V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); - - if (!filter.apply(key, val)) - return null; - } + if (!filter.apply(key, val)) + return null; return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index e2b843b..20b4812 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -66,9 +66,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache private byte[] keyValFilterBytes; /** */ - private CacheEntryPredicate prjFilter; - - /** */ @GridDirectTransient private IgniteReducer<Object, Object> rdc; @@ -177,7 +174,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param clause Query clause. * @param clsName Query class name. * @param keyValFilter Key-value filter. - * @param prjFilter Projection filter. * @param rdc Reducer. * @param trans Transformer. * @param pageSize Page size. @@ -194,7 +190,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache String clause, String clsName, IgniteBiPredicate<Object, Object> keyValFilter, - CacheEntryPredicate prjFilter, IgniteReducer<Object, Object> rdc, IgniteClosure<Object, Object> trans, int pageSize, @@ -217,7 +212,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.clause = clause; this.clsName = clsName; this.keyValFilter = keyValFilter; - this.prjFilter = prjFilter; this.rdc = rdc; this.trans = trans; this.pageSize = pageSize; @@ -241,13 +235,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache keyValFilterBytes = CU.marshal(ctx, keyValFilter); } - if (prjFilter != null) { - if (ctx.deploymentEnabled()) - prepareObject(prjFilter, ctx); - - prjFilter.prepareMarshal(ctx.cacheContext(cacheId)); - } - if (rdc != null) { if (ctx.deploymentEnabled()) prepareObject(rdc, ctx); @@ -281,9 +268,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache if (keyValFilterBytes != null) keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr); - if (prjFilter != null) - prjFilter.finishUnmarshal(ctx.cacheContext(cacheId), ldr); - if (rdcBytes != null) rdc = mrsh.unmarshal(rdcBytes, ldr); @@ -368,11 +352,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache return keyValFilter; } - /** {@inheritDoc} */ - public CacheEntryPredicate projectionFilter() { - return prjFilter; - } - /** * @return Reducer. */ @@ -530,36 +509,30 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 16: - if (!writer.writeMessage("prjFilter", prjFilter)) - return false; - - writer.incrementState(); - - case 17: if (!writer.writeByteArray("rdcBytes", rdcBytes)) return false; writer.incrementState(); - case 18: + case 17: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 19: + case 18: if (!writer.writeInt("taskHash", taskHash)) return false; writer.incrementState(); - case 20: + case 19: if (!writer.writeByteArray("transBytes", transBytes)) return false; writer.incrementState(); - case 21: + case 20: if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1)) return false; @@ -686,14 +659,6 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 16: - prjFilter = reader.readMessage("prjFilter"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 17: rdcBytes = reader.readByteArray("rdcBytes"); if (!reader.isLastRead()) @@ -701,7 +666,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); - case 18: + case 17: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -709,7 +674,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); - case 19: + case 18: taskHash = reader.readInt("taskHash"); if (!reader.isLastRead()) @@ -717,7 +682,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); - case 20: + case 19: transBytes = reader.readByteArray("transBytes"); if (!reader.isLastRead()) @@ -725,7 +690,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); - case 21: + case 20: byte typeOrd; typeOrd = reader.readByte("type"); @@ -749,7 +714,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index bc3744e..e66240f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1532,7 +1532,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); + txEntry.cached(entryEx(cacheCtx, txKey)); continue; // While loop. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 193e3f8..6fd90a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -393,13 +393,13 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); for (Map.Entry<K, V> entry : entries) - keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey())); + keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true)); } Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() { @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey()); - CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue()); + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true); + CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true); return new IgniteDataLoaderEntry(key, val); } @@ -410,7 +410,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay return new IgniteFutureImpl<>(resFut); } catch (IgniteException e) { - return new IgniteFinishedFutureImpl<>(ctx, e); + return new IgniteFinishedFutureImpl<>(e); } finally { leaveBusy(); @@ -441,10 +441,10 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay public IgniteFuture<?> addDataInternal(Collection<? extends IgniteDataLoaderEntry> entries) { enterBusy(); - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx); + GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); try { - resFut.listenAsync(rmvActiveFut); + resFut.listen(rmvActiveFut); activeFuts.add(resFut); @@ -485,8 +485,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay @Override public IgniteFuture<?> addData(K key, V val) { A.notNull(key, "key"); - KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key); - CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val); + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true); + CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0))); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 5d96086..72b8dd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -113,7 +113,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite return set != null ? set.size() : 0; } - CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null, + CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, new GridSetQueryPredicate<>(id, collocated), false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -344,7 +344,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @SuppressWarnings("unchecked") private GridCloseableIterator<T> iterator0() { try { - CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null, + CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, new GridSetQueryPredicate<>(id, collocated), false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7dac569/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessor.java index aaf615c..7c63df9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessor.java @@ -45,7 +45,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public int typeId(Object obj); /** - * Converts temporary offheap object to heap-based. + * Converts temporary off-heap object to heap-based. * * @param ctx Context. * @param obj Object. @@ -55,13 +55,6 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { @Nullable public Object unwrapTemporary(GridCacheContext ctx, @Nullable Object obj) throws IgniteException; /** - * @param obj Object to marshal. - * @return Portable object. - * @throws IgniteException In case of error. - */ - public Object marshalToPortable(@Nullable Object obj) throws IgniteException; - - /** * Prepares cache object for cache (e.g. copies user-provided object if needed). * * @param obj Cache object. @@ -71,18 +64,6 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { @Nullable public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx); /** - * @return Portable marshaller for client connectivity or {@code null} if it's not - * supported (in case of OS edition). - */ - @Nullable public GridClientMarshaller portableMarshaller(); - - /** - * @param marsh Client marshaller. - * @return Whether marshaller is portable. - */ - public boolean isPortable(GridClientMarshaller marsh); - - /** * Checks whether object is portable object. * * @param obj Object to check. @@ -132,10 +113,21 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { /** * @param ctx Cache context. + * @param obj Key value. + * @param userObj If {@code true} then given object is object provided by user and should be copied + * before stored in cache. + * @return Cache key object. + */ + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj); + + /** + * @param ctx Cache context. * @param obj Object. + * @param userObj If {@code true} then given object is object provided by user and should be copied + * before stored in cache. * @return Cache object. */ - @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj); + @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, boolean userObj); /** * @param ctx Cache context. @@ -155,13 +147,6 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) throws IgniteCheckedException; /** - * @param ctx Cache context. - * @param obj Key value. - * @return Cache key object. - */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj); - - /** * @param obj Value. * @return {@code True} if object is of known immutable type of it is marked * with {@link IgniteImmutable} annotation. @@ -169,6 +154,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public boolean immutable(Object obj); /** + * @param cacheName Cache name. * @return {@code True} if portable format should be preserved when passing values to cache store. */ public boolean keepPortableInStore(@Nullable String cacheName);