Repository: incubator-ignite Updated Branches: refs/heads/ignite-51-filters 6febd89af -> 5d40c422e
# ignite-51 filters refactoring Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d40c422 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d40c422 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d40c422 Branch: refs/heads/ignite-51-filters Commit: 5d40c422e7260f92d087536f6a184a64dfcf42bf Parents: 6febd89 Author: sboikov <semen.boi...@inria.fr> Authored: Tue Mar 3 22:37:54 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Wed Mar 4 07:56:33 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 10 ++ .../cache/CacheEntryPredicateAdapter.java | 42 ++++++-- .../cache/CacheEntryPredicateContainsValue.java | 42 +++++++- .../cache/CacheEntryPredicateHasValue.java | 2 +- .../cache/CacheEntryPredicateNoValue.java | 2 +- .../cache/CacheEntrySerializablePredicate.java | 66 +++++++++--- .../processors/cache/GridCacheContext.java | 54 ++++------ .../processors/cache/GridCacheMapEntry.java | 8 +- .../processors/cache/GridCacheMessage.java | 100 +------------------ .../processors/cache/GridCacheUtils.java | 54 ++++++---- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 43 ++++++++ .../distributed/near/GridNearLockFuture.java | 4 +- .../distributed/near/GridNearLockRequest.java | 27 ++++- .../cache/query/GridCacheQueryRequest.java | 14 +-- .../cache/transactions/IgniteTxEntry.java | 13 +++ .../transactions/IgniteTxLocalAdapter.java | 4 +- .../org/apache/ignite/internal/util/F0.java | 82 ++++++++++++++- .../cache/GridCacheAbstractFullApiSelfTest.java | 23 +++-- 19 files changed, 382 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 0100b2a..a9263b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -539,6 +539,16 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 98: + msg = new CacheEntryPredicateContainsValue(); + + break; + + case 99: + msg = new CacheEntrySerializablePredicate(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java index c11de54..dd6df0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.plugin.extensions.communication.*; import java.nio.*; @@ -28,39 +29,68 @@ import java.nio.*; public abstract class CacheEntryPredicateAdapter implements CacheEntryPredicate { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - assert false; + // No-op. } /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { - assert false; + // No-op. } /** {@inheritDoc} */ @Override public byte directType() { - assert false; + assert false : this; return 0; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - assert false; + assert false : this; return 0; } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - assert false; + assert false : this; return false; } /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert false; + assert false : this; return false; } + + /** + * @param e Entry. + * @return {@code True} if given entry has value. + */ + protected boolean hasValue(GridCacheEntryEx e) { + try { + if (e.hasValue()) + return true; + + GridCacheContext cctx = e.context(); + + if (cctx.transactional()) { + IgniteInternalTx tx = cctx.tm().userTx(); + + if (tx != null) + return tx.peek(cctx, false, e.key(), null) != null; + } + + return false; + } + catch (GridCacheFilterFailedException err) { + assert false; + + err.printStackTrace(); + + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java index 7aff5a0..412b685 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java @@ -74,22 +74,56 @@ public class CacheEntryPredicateContainsValue implements CacheEntryPredicate { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + val = reader.readMessage("val"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; } /** {@inheritDoc} */ @Override public byte directType() { - return 0; + return 98; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 0; + return 1; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java index 173c6e9..3b921ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java @@ -23,6 +23,6 @@ package org.apache.ignite.internal.processors.cache; public class CacheEntryPredicateHasValue extends CacheEntryPredicateAdapter { /** {@inheritDoc} */ @Override public boolean apply(GridCacheEntryEx e) { - return e.hasValue(); + return hasValue(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java index 6a4df21..13f022d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java @@ -23,6 +23,6 @@ package org.apache.ignite.internal.processors.cache; public class CacheEntryPredicateNoValue extends CacheEntryPredicateAdapter { /** {@inheritDoc} */ @Override public boolean apply(GridCacheEntryEx e) { - return !e.hasValue(); + return !hasValue(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java index 73b0789..583882f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java @@ -31,7 +31,7 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate { /** */ @GridToStringInclude @GridDirectTransient - private CacheEntryPredicate[] p; + private CacheEntryPredicate p; /** */ private byte[] bytes; @@ -44,25 +44,36 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate { } /** - * @param p Predicate. + * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. */ - public CacheEntrySerializablePredicate(CacheEntryPredicate... p) { + public CacheEntrySerializablePredicate(CacheEntryPredicate p) { assert p != null; this.p = p; } + /** + * @return Predicate. + */ + public CacheEntryPredicate predicate() { + return p; + } + /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { assert bytes != null; p = ctx.marshaller().unmarshal(bytes, ldr); + + p.finishUnmarshal(ctx, ldr); } /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { assert p != null; + p.prepareMarshal(ctx); + bytes = ctx.marshaller().marshal(p); } @@ -70,31 +81,60 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate { @Override public boolean apply(GridCacheEntryEx e) { assert p != null; - for (CacheEntryPredicate p0 : p) { - if (!p0.apply(e)) - return false; - } - - return true; + return p.apply(e); } /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("bytes", bytes)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + bytes = reader.readByteArray("bytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; } /** {@inheritDoc} */ @Override public byte directType() { - return 0; + return 99; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 0; + return 1; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index eb31100..4a2cf90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -148,16 +148,10 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheAdapter<K, V> cache; /** No value filter array. */ - private CacheEntryPredicate[] noValArr0; + private CacheEntryPredicate[] noValArr; /** Has value filter array. */ - private CacheEntryPredicate[] hasValArr0; - - /** No-peek-value filter array. */ - private IgnitePredicate<Cache.Entry<Object, Object>>[] noPeekArr; - - /** Has-peek-value filter array. */ - private IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArr; + private CacheEntryPredicate[] hasValArr; /** Cached local rich node. */ private ClusterNode locNode; @@ -287,11 +281,8 @@ public class GridCacheContext<K, V> implements Externalizable { log = ctx.log(getClass()); - noPeekArr = new IgnitePredicate[]{F.cacheNoPeekValue()}; - hasPeekArr = new IgnitePredicate[]{F.cacheHasPeekValue()}; - - noValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateNoValue()}; - hasValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateHasValue()}; + noValArr = new CacheEntryPredicate[]{new CacheEntrySerializablePredicate(new CacheEntryPredicateNoValue())}; + hasValArr = new CacheEntryPredicate[]{new CacheEntrySerializablePredicate(new CacheEntryPredicateHasValue())}; cacheObjCtx = new CacheObjectContext(ctx); @@ -963,43 +954,34 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return No get-value filter. + * @param p Predicate. + * @return {@code True} if given predicate is filter for {@code putIfAbsent} operation. */ - @SuppressWarnings("unchecked") - public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() { - return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr); - } + public boolean putIfAbsentFilter(@Nullable CacheEntryPredicate[] p) { + if (p == null || p.length == 0) + return false; - /** - * @return Has get-value filer. - */ - public IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArray() { - return hasPeekArr; + for (CacheEntryPredicate p0 : p) { + if ((p0 instanceof CacheEntrySerializablePredicate) && + ((CacheEntrySerializablePredicate) p0).predicate() instanceof CacheEntryPredicateNoValue) + return true; + } + + return false; } /** * @return No value filter. */ public CacheEntryPredicate[] noValArray() { - return noValArr0; + return noValArr; } /** * @return Has value filter. */ public CacheEntryPredicate[] hasValArray() { - return noValArr0; - } - - /** - * @param val Value to check. - * @return Predicate array that checks for value. - */ - @SuppressWarnings({"unchecked"}) - public IgnitePredicate<Cache.Entry<K, V>>[] equalsPeekArray(V val) { - assert val != null; - - return new IgnitePredicate[]{F.cacheContainsPeek(val)}; + return hasValArr; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 518df69..004cc63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1391,7 +1391,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Apply metrics. if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray()) + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) cctx.cache().metrics0().onRead(old != null); } @@ -1400,7 +1400,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean pass = cctx.isAll(this, filter); if (!pass) { - if (expiryPlc != null && !readThrough && filter != cctx.noValArray() && hasValueUnlocked()) + if (expiryPlc != null && !readThrough && !cctx.putIfAbsentFilter(filter) && hasValueUnlocked()) updateTtl(expiryPlc); return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null); @@ -1834,7 +1834,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Apply metrics. if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray()) + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) cctx.cache().metrics0().onRead(oldVal != null); } @@ -1843,7 +1843,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean pass = cctx.isAll(this, filter); if (!pass) { - if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noValArray()) + if (expiryPlc != null && !readThrough && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter)) updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult(false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 0e72011..ec9ca1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -168,18 +168,6 @@ public abstract class GridCacheMessage implements Message { } /** - * @param filters Predicate filters. - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - protected final void prepareFilter(@Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filters, - GridCacheSharedContext ctx) throws IgniteCheckedException { - if (filters != null) - for (IgnitePredicate filter : filters) - prepareObject(filter, ctx); - } - - /** * @param o Object to prepare for marshalling. * @param ctx Context. * @throws IgniteCheckedException If failed. @@ -208,18 +196,6 @@ public abstract class GridCacheMessage implements Message { } /** - * @param col Collection of objects to prepare for marshalling. - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - protected final void prepareObjects(@Nullable Iterable<?> col, GridCacheSharedContext ctx) - throws IgniteCheckedException { - if (col != null) - for (Object o : col) - prepareObject(o, ctx); - } - - /** * @param depInfo Deployment to set. * @see GridCacheDeployable#prepare(GridDeploymentInfo) */ @@ -346,13 +322,6 @@ public abstract class GridCacheMessage implements Message { for (IgniteTxEntry e : txEntries) { e.marshal(ctx, transferExpiry); - if (e.filters() != null) { - GridCacheContext cctx = ctx.cacheContext(e.cacheId()); - - for (CacheEntryPredicate p : e.filters()) - p.prepareMarshal(cctx); - } - if (ctx.deploymentEnabled()) { prepareObject(e.key(), ctx); prepareObject(e.value(), ctx); @@ -382,16 +351,8 @@ public abstract class GridCacheMessage implements Message { assert ctx != null; if (txEntries != null) { - for (IgniteTxEntry e : txEntries) { + for (IgniteTxEntry e : txEntries) e.unmarshal(ctx, near, ldr); - - if (e.filters() != null) { - GridCacheContext cctx = ctx.cacheContext(e.cacheId()); - - for (CacheEntryPredicate p : e.filters()) - p.finishUnmarshal(cctx, ldr); - } - } } } @@ -450,65 +411,6 @@ public abstract class GridCacheMessage implements Message { } /** - * @param filter Collection to marshal. - * @param ctx Context. - * @return Marshalled collection. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected final <T> byte[][] marshalFilter( - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, - GridCacheSharedContext ctx) - throws IgniteCheckedException - { - assert ctx != null; - - if (filter == null) - return null; - - byte[][] filterBytes = new byte[filter.length][]; - - for (int i = 0; i < filter.length; i++) { - IgnitePredicate<Cache.Entry<Object, Object>> p = filter[i]; - - if (ctx.deploymentEnabled()) - prepareObject(p, ctx); - - filterBytes[i] = p == null ? null : CU.marshal(ctx, p); - } - - return filterBytes; - } - - /** - * @param byteCol Collection to unmarshal. - * @param ctx Context. - * @param ldr Loader. - * @return Unmarshalled collection. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked"}) - @Nullable protected final <T> IgnitePredicate<Cache.Entry<Object, Object>>[] unmarshalFilter( - @Nullable byte[][] byteCol, GridCacheSharedContext<Object, Object> ctx, ClassLoader ldr) - throws IgniteCheckedException - { - assert ldr != null; - assert ctx != null; - - if (byteCol == null) - return null; - - IgnitePredicate<Cache.Entry<Object, Object>>[] filter = new IgnitePredicate[byteCol.length]; - - Marshaller marsh = ctx.marshaller(); - - for (int i = 0; i < byteCol.length; i++) - filter[i] = byteCol[i] == null ? null : - marsh.<IgnitePredicate<Cache.Entry<Object, Object>>>unmarshal(byteCol[i], ldr); - - return filter; - } - - /** * @param col Collection to marshal. * @param ctx Context. * @return Marshalled collection. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 01ccddd..e4243ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -132,32 +132,26 @@ public class GridCacheUtils { /** Empty predicate array. */ private static final CacheEntryPredicate[] EMPTY_FILTER0 = new CacheEntryPredicate[0]; - /** Always false predicat array. */ - private static final IgnitePredicate[] ALWAYS_FALSE = new IgnitePredicate[] { - new P1() { - @Override public boolean apply(Object e) { - return false; - } - } - }; - /** */ - private static final CacheEntryPredicate[] ALWAYS_FALSE0 = new CacheEntryPredicate[] { + private static final CacheEntryPredicate ALWAYS_FALSE0 = new CacheEntrySerializablePredicate( new CacheEntryPredicateAdapter() { @Override public boolean apply(GridCacheEntryEx e) { return false; } } - }; + ); /** */ - private static final CacheEntryPredicate[] ALWAYS_TRUE0 = new CacheEntryPredicate[] { + private static final CacheEntryPredicate ALWAYS_TRUE0 = new CacheEntrySerializablePredicate( new CacheEntryPredicateAdapter() { @Override public boolean apply(GridCacheEntryEx e) { return true; } } - }; + ); + + /** */ + private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0}; /** Read filter. */ private static final IgnitePredicate READ_FILTER = new P1<Object>() { @@ -780,16 +774,38 @@ public class GridCacheUtils { /** * @return Always false filter. */ - @SuppressWarnings({"unchecked"}) - public static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] alwaysFalse() { - return (IgnitePredicate<Cache.Entry<K, V>>[])ALWAYS_FALSE; + public static CacheEntryPredicate alwaysFalse0() { + return ALWAYS_FALSE0; } /** * @return Always false filter. */ - public static CacheEntryPredicate[] alwaysFalse0() { - return ALWAYS_FALSE0; + public static CacheEntryPredicate alwaysTrue0() { + return ALWAYS_TRUE0; + } + + /** + * @return Always false filter. + */ + public static CacheEntryPredicate[] alwaysFalse0Arr() { + return ALWAYS_FALSE0_ARR; + } + + /** + * @param p Predicate. + * @return {@code True} if always false filter. + */ + public static boolean isAlwaysFalse0(@Nullable CacheEntryPredicate[] p) { + return p != null && p.length == 1 && p[0] == ALWAYS_FALSE0; + } + + /** + * @param p Predicate. + * @return {@code True} if always false filter. + */ + public static boolean isAlwaysTrue0(@Nullable CacheEntryPredicate[] p) { + return p != null && p.length == 1 && p[0] == ALWAYS_TRUE0; } /** @@ -1809,7 +1825,7 @@ public class GridCacheUtils { ) { return new CacheEntryPredicateAdapter() { @Override public boolean apply(GridCacheEntryEx e) { - return aff.isPrimary(n, e.key()); + return aff.isPrimary(n, e.key().value(e.context(), false)); } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 11101fe..fb4e0e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -349,7 +349,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu ret.value(val); } - if (hasFilters && !cacheCtx.isAll(cached.wrapLazyValue(), txEntry.filters())) { + if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { if (expiry != null) txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 4d79f18..9deb63c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -374,6 +374,49 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ + @Override public Set<Cache.Entry<K, V>> primaryEntrySet( + @Nullable final CacheEntryPredicate... filter) { + final long topVer = ctx.affinity().affinityTopologyVersion(); + + Collection<Cache.Entry<K, V>> entries = + F.flatCollections( + F.viewReadOnly( + dht().topology().currentLocalPartitions(), + new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { + @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { + Collection<GridDhtCacheEntry> entries0 = p.entries(); + + if (!F.isEmpty(filter)) + entries0 = F.view(entries0, new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + return F.isAll(e, filter); + } + }); + + return F.viewReadOnly( + entries0, + new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { + @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { + return e.wrapLazyValue(); + } + }, + new P1<GridDhtCacheEntry>() { + @Override public boolean apply(GridDhtCacheEntry e) { + return !e.obsoleteOrDeleted(); + } + }); + } + }, + new P1<GridDhtLocalPartition>() { + @Override public boolean apply(GridDhtLocalPartition p) { + return p.primary(topVer); + } + })); + + return new GridCacheEntrySet<>(ctx, entries, null); + } + + /** {@inheritDoc} */ @Override public Set<K> keySet(@Nullable CacheEntryPredicate[] filter) { return new GridCacheKeySet<>(ctx, entrySet(filter), null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 67001aa..3895924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1035,7 +1035,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); } if (record) { @@ -1390,7 +1390,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); } entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index d34f5a7..c027d1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -49,7 +49,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { private IgniteUuid miniId; /** Filter. */ - @GridDirectTransient private CacheEntryPredicate[] filter; /** Implicit flag. */ @@ -307,8 +306,10 @@ public class GridNearLockRequest extends GridDistributedLockRequest { if (filter != null) { GridCacheContext cctx = ctx.cacheContext(cacheId); - for (CacheEntryPredicate p : filter) - p.prepareMarshal(cctx); + for (CacheEntryPredicate p : filter) { + if (p != null) + p.prepareMarshal(cctx); + } } } @@ -319,8 +320,10 @@ public class GridNearLockRequest extends GridDistributedLockRequest { if (filter != null) { GridCacheContext cctx = ctx.cacheContext(cacheId); - for (CacheEntryPredicate p : filter) - p.finishUnmarshal(cctx, ldr); + for (CacheEntryPredicate p : filter) { + if (p != null) + p.finishUnmarshal(cctx, ldr); + } } } @@ -351,6 +354,12 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); + case 24: + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + case 25: if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; @@ -437,6 +446,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); + case 24: + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 25: hasTransforms = reader.readBoolean("hasTransforms"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index c5f55d4..e2b843b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -66,13 +66,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache private byte[] keyValFilterBytes; /** */ - @GridDirectTransient private CacheEntryPredicate prjFilter; /** */ - private byte[] prjFilterBytes; - - /** */ @GridDirectTransient private IgniteReducer<Object, Object> rdc; @@ -249,7 +245,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache if (ctx.deploymentEnabled()) prepareObject(prjFilter, ctx); - prjFilterBytes = CU.marshal(ctx, prjFilter); + prjFilter.prepareMarshal(ctx.cacheContext(cacheId)); } if (rdc != null) { @@ -285,8 +281,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache if (keyValFilterBytes != null) keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr); - if (prjFilterBytes != null) - prjFilter = mrsh.unmarshal(prjFilterBytes, ldr); + if (prjFilter != null) + prjFilter.finishUnmarshal(ctx.cacheContext(cacheId), ldr); if (rdcBytes != null) rdc = mrsh.unmarshal(rdcBytes, ldr); @@ -534,7 +530,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 16: - if (!writer.writeByteArray("prjFilterBytes", prjFilterBytes)) + if (!writer.writeMessage("prjFilter", prjFilter)) return false; writer.incrementState(); @@ -690,7 +686,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 16: - prjFilterBytes = reader.readByteArray("prjFilterBytes"); + prjFilter = reader.readMessage("prjFilter"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 82376b2..5e6f2bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -684,6 +684,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim * @throws IgniteCheckedException If failed. */ public void marshal(GridCacheSharedContext<?, ?> ctx, boolean transferExpiry) throws IgniteCheckedException { + if (filters != null) { + for (CacheEntryPredicate p : filters) { + if (p != null) + p.prepareMarshal(ctx.cacheContext(cacheId)); + } + } + // Do not serialize filters if they are null. if (depEnabled) { if (transformClosBytes == null && entryProcessorsCol != null) @@ -733,6 +740,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim if (filters == null) filters = CU.empty0(); + else { + for (CacheEntryPredicate p : filters) { + if (p != null) + p.finishUnmarshal(ctx.cacheContext(cacheId), clsLdr); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 0f24ad6..a0898f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2398,7 +2398,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ret.value(v); } - boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached.wrapLazyValue(), filter);; + boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); // For remove operation we return true only if we are removing s/t, // i.e. cached value is not null. @@ -2423,7 +2423,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.filters(CU.empty0()); txEntry.filtersSet(false); - updateTtl = filter != cacheCtx.noPeekArray(); + updateTtl = !cacheCtx.putIfAbsentFilter(filter); } if (updateTtl) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/main/java/org/apache/ignite/internal/util/F0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/F0.java b/modules/core/src/main/java/org/apache/ignite/internal/util/F0.java index ce3815c..a7faa0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/F0.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/F0.java @@ -104,15 +104,93 @@ public class F0 { }; } + /** + * @param p1 Filter1. + * @param p2 Filter2. + * @return And filter. + */ public static CacheEntryPredicate and0(@Nullable final CacheEntryPredicate[] p1, @Nullable final CacheEntryPredicate... p2) { - return null; + if (CU.isAlwaysFalse0(p1) || CU.isAlwaysFalse0(p2)) + return CU.alwaysFalse0(); + + if (CU.isAlwaysTrue0(p1) && CU.isAlwaysTrue0(p2)) + return CU.alwaysTrue0(); + + final boolean e1 = F.isEmpty(p1); + final boolean e2 = F.isEmpty(p2); + + if (e1 && e2) + return CU.alwaysTrue0(); + + if (e1 && !e2) { + assert p2 != null; + + if (p2.length == 1) + return p2[0]; + } + + if (!e1 && e2) { + assert p1 != null; + + if (p1.length == 1) + return p1[0]; + } + + return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + if (!e1) { + assert p1 != null; + + for (CacheEntryPredicate p : p1) + if (p != null && !p.apply(e)) + return false; + } + + if (!e2) { + assert p2 != null; + + for (CacheEntryPredicate p : p2) + if (p != null && !p.apply(e)) + return false; + } + + return true; + } + }); } + /** + * @param p Filter1. + * @param ps Filter2. + * @return And filter. + */ public static CacheEntryPredicate and0( @Nullable final CacheEntryPredicate p, @Nullable final CacheEntryPredicate... ps) { - return null; + if (p == null && F.isEmptyOrNulls(ps)) + return CU.alwaysTrue0(); + + if (F.isAlwaysFalse(p) && F.isAlwaysFalse(ps)) + return CU.alwaysFalse0(); + + if (F.isAlwaysTrue(p) && F.isAlwaysTrue(ps)) + return CU.alwaysTrue0(); + + return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + assert ps != null; + + if (p != null && !p.apply(e)) + return false; + + for (CacheEntryPredicate p : ps) + if (p != null && !p.apply(e)) + return false; + + return true; + } + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d40c422/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index f816d75..5972f75 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -3441,7 +3441,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract locKeys.addAll(cache.keySet(new CacheEntryPredicateAdapter() { @Override public boolean apply(GridCacheEntryEx e) { - return grid(0).affinity(null).isBackup(grid(0).localNode(), e.key()); + return grid(0).affinity(null).isBackup(grid(0).localNode(), e.key().value(e.context(), false)); } })); @@ -3896,19 +3896,30 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertFalse(cache.iterator().hasNext()); - final int SIZE = 20000; + final int SIZE = 10_000; Map<String, Integer> entries = new HashMap<>(); + Map<String, Integer> putMap = new HashMap<>(); + for (int i = 0; i < SIZE; ++i) { - cache.put(Integer.toString(i), i); + String key = Integer.toString(i); + + putMap.put(key, i); + + entries.put(key, i); - entries.put(Integer.toString(i), i); + if (putMap.size() == 500) { + cache.putAll(putMap); - if (i > 0 && i % 500 == 0) - info("Puts finished: " + i); + info("Puts finished: " + (i + 1)); + + putMap.clear(); + } } + cache.putAll(putMap); + checkIteratorHasNext(); checkIteratorCache(entries);