# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/783e5270 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/783e5270 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/783e5270 Branch: refs/heads/ignite-41 Commit: 783e52703147a6a910b9d2e09547db2a49f511e9 Parents: 3397461 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 22 11:30:30 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 22 13:31:22 2014 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAccessExpiryPolicy.java | 131 --------------- .../processors/cache/GridCacheAdapter.java | 118 ++++++++++++- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheExpiryPolicy.java | 56 ------- .../processors/cache/GridCacheMapEntry.java | 24 ++- .../processors/cache/GridCacheMessage.java | 14 +- .../processors/cache/GridCacheProxyImpl.java | 2 +- .../cache/IgniteCacheExpiryPolicy.java | 60 +++++++ .../GridCacheExternalizableExpiryPolicy.java | 153 ----------------- .../GridDistributedTxRemoteAdapter.java | 2 +- .../IgniteExternalizableExpiryPolicy.java | 166 +++++++++++++++++++ .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../cache/distributed/dht/GridDhtGetFuture.java | 4 +- .../distributed/dht/GridDhtTxFinishRequest.java | 2 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 38 ++++- .../cache/distributed/dht/GridDhtTxRemote.java | 1 + .../dht/GridPartitionedGetFuture.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 61 +++---- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 60 ++++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 4 +- .../distributed/near/GridNearAtomicCache.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../distributed/near/GridNearGetFuture.java | 4 +- .../near/GridNearTransactionalCache.java | 1 + .../local/atomic/GridLocalAtomicCache.java | 8 +- .../cache/transactions/IgniteTxEntry.java | 17 +- .../transactions/IgniteTxLocalAdapter.java | 119 ++++++++----- .../handlers/cache/GridCacheCommandHandler.java | 7 +- .../GridTcpCommunicationMessageFactory.java | 2 +- .../cache/IgniteCacheAbstractTest.java | 47 ++++++ ...maryWriteOrderWithStoreExpiryPolicyTest.java | 23 +++ ...iteCacheAtomicWithStoreExpiryPolicyTest.java | 22 +++ .../IgniteCacheExpiryPolicyAbstractTest.java | 50 +++++- .../IgniteCacheExpiryPolicyTestSuite.java | 3 + .../IgniteCacheTxWithStoreExpiryPolicyTest.java | 22 +++ .../cache/GridCacheAbstractFullApiSelfTest.java | 11 +- .../cache/GridCacheBasicApiAbstractTest.java | 3 +- .../GridCacheRefreshAheadAbstractSelfTest.java | 5 +- .../processors/cache/GridCacheTestEntryEx.java | 4 +- .../cache/GridCacheTtlManagerLoadTest.java | 3 +- .../cache/GridCacheTtlManagerSelfTest.java | 3 +- .../GridCacheBasicOpAbstractTest.java | 2 +- ...heExpiredEntriesPreloadAbstractSelfTest.java | 3 +- ...tomicClientOnlyMultiNodeFullApiSelfTest.java | 5 +- ...eAtomicNearOnlyMultiNodeFullApiSelfTest.java | 3 +- .../GridCachePartitionedEvictionSelfTest.java | 4 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 2 +- .../cache/GridCacheAbstractQuerySelfTest.java | 2 +- .../cache/websession/GridWebSessionFilter.java | 5 +- .../websession/GridWebSessionListener.java | 5 +- 53 files changed, 782 insertions(+), 547 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java deleted file mode 100644 index 4f30a95..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java +++ /dev/null @@ -1,131 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import javax.cache.expiry.*; -import java.util.*; - -/** - * - */ -public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy { - /** */ - private final long accessTtl; - - /** */ - private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; - - /** */ - private Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrsMap; - - /** - * @param expiryPlc Expiry policy. - * @return Access expire policy. - */ - public static GridCacheAccessExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) { - if (expiryPlc == null) - return null; - - Duration duration = expiryPlc.getExpiryForAccess(); - - if (duration == null) - return null; - - return new GridCacheAccessExpiryPolicy(GridCacheUtils.toTtl(duration)); - } - - /** - * @param accessTtl TTL for access. - */ - public GridCacheAccessExpiryPolicy(long accessTtl) { - assert accessTtl >= 0 : accessTtl; - - this.accessTtl = accessTtl; - } - - /** {@inheritDoc} */ - @Override public long forAccess() { - return accessTtl; - } - - /** {@inheritDoc} */ - @Override public long forCreate() { - return -1L; - } - - /** {@inheritDoc} */ - @Override public long forUpdate() { - return -1L; - } - - /** - * - */ - public synchronized void reset() { - if (entries != null) - entries.clear(); - - if (rdrsMap != null) - rdrsMap.clear(); - } - - /** - * @param key Entry key. - * @param keyBytes Entry key bytes. - * @param ver Entry version. - */ - @SuppressWarnings("unchecked") - @Override public synchronized void onAccessUpdated(Object key, - byte[] keyBytes, - GridCacheVersion ver, - @Nullable Collection<UUID> rdrs) { - if (entries == null) - entries = new HashMap<>(); - - IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver); - - entries.put(key, t); - - if (rdrs != null && !rdrs.isEmpty()) { - if (rdrsMap == null) - rdrsMap = new HashMap<>(); - - for (UUID nodeId : rdrs) { - Collection<IgniteBiTuple<byte[], GridCacheVersion>> col = rdrsMap.get(nodeId); - - if (col == null) - rdrsMap.put(nodeId, col = new ArrayList<>()); - - col.add(t); - } - } - } - - /** - * @return TTL update request. - */ - @Nullable @Override public synchronized Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { - return entries; - } - - /** {@inheritDoc} */ - @Nullable @Override public synchronized Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { - return rdrsMap; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheAccessExpiryPolicy.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index d7ba092..3780cbb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1755,7 +1755,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im taskName, deserializePortable, forcePrimary, - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc), + GetExpiryPolicy.forPolicy(expiryPlc), filter); } @@ -1767,7 +1767,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im final String taskName, final boolean deserializePortable, final boolean forcePrimary, - @Nullable GridCacheExpiryPolicy expiry, + @Nullable IgniteCacheExpiryPolicy expiry, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter ) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -4847,4 +4847,118 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im this.fut = fut; } } + + /** + * + */ + protected static class GetExpiryPolicy implements IgniteCacheExpiryPolicy { + /** */ + private final long accessTtl; + + /** */ + private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; + + /** */ + private Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrsMap; + + /** + * @param expiryPlc Expiry policy. + * @return Access expire policy. + */ + public static GetExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) { + if (expiryPlc == null) + return null; + + Duration duration = expiryPlc.getExpiryForAccess(); + + if (duration == null) + return null; + + return new GetExpiryPolicy(CU.toTtl(duration)); + } + + /** + * @param accessTtl TTL for access. + */ + public GetExpiryPolicy(long accessTtl) { + assert accessTtl >= 0 : accessTtl; + + this.accessTtl = accessTtl; + } + + /** {@inheritDoc} */ + @Override public long forAccess() { + return accessTtl; + } + + /** {@inheritDoc} */ + @Override public long forCreate() { + return -1L; + } + + /** {@inheritDoc} */ + @Override public long forUpdate() { + return -1L; + } + + /** + * + */ + public synchronized void reset() { + if (entries != null) + entries.clear(); + + if (rdrsMap != null) + rdrsMap.clear(); + } + + /** + * @param key Entry key. + * @param keyBytes Entry key bytes. + * @param ver Entry version. + */ + @SuppressWarnings("unchecked") + @Override public synchronized void ttlUpdated(Object key, + byte[] keyBytes, + GridCacheVersion ver, + @Nullable Collection<UUID> rdrs) { + if (entries == null) + entries = new HashMap<>(); + + IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver); + + entries.put(key, t); + + if (rdrs != null && !rdrs.isEmpty()) { + if (rdrsMap == null) + rdrsMap = new HashMap<>(); + + for (UUID nodeId : rdrs) { + Collection<IgniteBiTuple<byte[], GridCacheVersion>> col = rdrsMap.get(nodeId); + + if (col == null) + rdrsMap.put(nodeId, col = new ArrayList<>()); + + col.add(t); + } + } + } + + /** + * @return TTL update request. + */ + @Nullable @Override public synchronized Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + return entries; + } + + /** {@inheritDoc} */ + @Nullable @Override public synchronized Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { + return rdrsMap; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetExpiryPolicy.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 6cf055b..5fb0b95 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -293,7 +293,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheExpiryPolicy expiryPlc) + @Nullable IgniteCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; /** @@ -426,7 +426,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - @Nullable GridCacheExpiryPolicy expiryPlc, + @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java deleted file mode 100644 index 0328d51..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java +++ /dev/null @@ -1,56 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * - */ -public interface GridCacheExpiryPolicy { - /** - * @return TTL. - */ - public abstract long forCreate(); - - /** - * @return TTL. - */ - public abstract long forUpdate(); - - /** - * @return TTL. - */ - public abstract long forAccess(); - - /** - * @param key Entry key. - * @param keyBytes Entry key bytes. - * @param ver Entry version. - * @param rdrs Entry readers. - */ - public void onAccessUpdated(Object key, - byte[] keyBytes, - GridCacheVersion ver, - @Nullable Collection<UUID> rdrs); - - /** - * @return Entries with TTL updated on access. - */ - @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries(); - - /** - * @return Readers for updated entries. - */ - @Nullable Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 6dc65aa..4bfe91e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -701,7 +701,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheExpiryPolicy expirePlc) + @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { cctx.denyOnFlag(LOCAL); @@ -734,7 +734,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheExpiryPolicy expiryPlc) + @Nullable IgniteCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { // Disable read-through if there is no store. if (readThrough && !cctx.isStoreEnabled()) @@ -885,14 +885,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (ret != null && expiryPlc != null) { long ttl = expiryPlc.forAccess(); - assert ttl >= 0 : ttl; - updateTtl(ttl); - expiryPlc.onAccessUpdated(key(), + expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); } } @@ -1520,7 +1518,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!pass) { if (expiryPlc != null && hasValueUnlocked()) { - long ttl = GridCacheUtils.toTtl(expiryPlc.getExpiryForAccess()); + long ttl = CU.toTtl(expiryPlc.getExpiryForAccess()); if (ttl != -1L) updateTtl(ttl); @@ -1584,7 +1582,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> long expireTime; if (expiryPlc != null) { - ttl = GridCacheUtils.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); + ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); if (ttl == -1L) { ttl = ttlExtras(); @@ -1687,7 +1685,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - @Nullable GridCacheExpiryPolicy expiryPlc, + @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, @@ -1833,10 +1831,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (newTtl != -1L) { updateTtl(newTtl); - expiryPlc.onAccessUpdated(key, + expiryPlc.ttlUpdated(key, getOrMarshalKeyBytes(), version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); } } @@ -2498,11 +2496,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> * @param ttl Time to live. */ private void updateTtl(long ttl) { + assert ttl >= 0 : ttl; assert Thread.holdsLock(this); - if (ttl == -1L) - return; - long expireTime = toExpireTime(ttl); long oldExpireTime = expireTimeExtras(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java index 95cccdb..ab98dcb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java @@ -334,10 +334,7 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage boolean transferExpiry = transferExpiryPolicy(); for (IgniteTxEntry<K, V> e : txEntries) { - if (transferExpiry) - e.transferExpiryPolicyIfNeeded(); - - e.marshal(ctx); + e.marshal(ctx, transferExpiry); if (ctx.deploymentEnabled()) { prepareObject(e.key(), ctx); @@ -348,6 +345,9 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage } } + /** + * @return {@code True} if entries expire policy should be marshalled. + */ protected boolean transferExpiryPolicy() { return false; } @@ -358,8 +358,10 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage * @param ldr Loader. * @throws IgniteCheckedException If failed. */ - protected final void unmarshalTx(Iterable<IgniteTxEntry<K, V>> txEntries, boolean near, - GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + protected final void unmarshalTx(Iterable<IgniteTxEntry<K, V>> txEntries, + boolean near, + GridCacheSharedContext<K, V> ctx, + ClassLoader ldr) throws IgniteCheckedException { assert ldr != null; assert ctx != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index ae76341..136e078 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -1882,7 +1882,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Nullable @Override public ExpiryPolicy expiry() { - throw new UnsupportedOperationException(); + return delegate.expiry(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java new file mode 100644 index 0000000..59cd937 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java @@ -0,0 +1,60 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; +import java.util.*; + +/** + * Wrapper for {@link ExpiryPolicy} used to track information about cache entries + * whose time to live was modified after access. + */ +public interface IgniteCacheExpiryPolicy { + /** + * @return TTL. + */ + public long forCreate(); + + /** + * @return TTL. + */ + public long forUpdate(); + + /** + * @return TTL. + */ + public long forAccess(); + + /** + * Callback when entry's ttl is updated on access. + * + * @param key Entry key. + * @param keyBytes Entry key bytes. + * @param ver Entry version. + * @param rdrs Entry readers. + */ + public void ttlUpdated(Object key, + byte[] keyBytes, + GridCacheVersion ver, + @Nullable Collection<UUID> rdrs); + + /** + * @return Entries with TTL updated on access. + */ + @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries(); + + /** + * @return Readers for updated entries. + */ + @Nullable Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java deleted file mode 100644 index 75da5de..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java +++ /dev/null @@ -1,153 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import javax.cache.expiry.*; -import java.io.*; -import java.util.concurrent.*; - -/** - * - */ -public class GridCacheExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable { - /** */ - private ExpiryPolicy plc; - - /** */ - private static final byte CREATE_TTL_MASK = 0x01; - - /** */ - private static final byte UPDATE_TTL_MASK = 0x02; - - /** */ - private static final byte ACCESS_TTL_MASK = 0x04; - - /** */ - private Duration forCreate; - - /** */ - private Duration forUpdate; - - /** */ - private Duration forAccess; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheExternalizableExpiryPolicy() { - // No-op. - } - - /** - * @param plc Expiry policy. - */ - public GridCacheExternalizableExpiryPolicy(ExpiryPolicy plc) { - assert plc != null; - - this.plc = plc; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return forCreate; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return forAccess; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return forUpdate; - } - - /** - * @param out Output stream. - * @param duration Duration. - * @throws IOException If failed. - */ - private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { - if (duration != null) { - if (duration.isEternal()) - out.writeLong(0); - else if (duration.getDurationAmount() == 0) - out.writeLong(1); - else - out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); - } - } - - /** - * @param in Input stream. - * @return Duration. - * @throws IOException If failed. - */ - private Duration readDuration(ObjectInput in) throws IOException { - long ttl = in.readLong(); - - assert ttl >= 0; - - if (ttl == 0) - return Duration.ETERNAL; - - return new Duration(TimeUnit.MILLISECONDS, ttl); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - byte flags = 0; - - Duration create = plc.getExpiryForCreation(); - - if (create != null) - flags |= CREATE_TTL_MASK; - - Duration update = plc.getExpiryForUpdate(); - - if (update != null) - flags |= UPDATE_TTL_MASK; - - Duration access = plc.getExpiryForAccess(); - - if (access != null) - flags |= ACCESS_TTL_MASK; - - out.writeByte(flags); - - writeDuration(out, create); - - writeDuration(out, update); - - writeDuration(out, access); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - byte flags = in.readByte(); - - if ((flags & CREATE_TTL_MASK) != 0) - forCreate = readDuration(in); - - if ((flags & UPDATE_TTL_MASK) != 0) - forUpdate = readDuration(in); - - if ((flags & ACCESS_TTL_MASK) != 0) - forAccess = readDuration(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheExternalizableExpiryPolicy.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3c98929..1abf714 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -587,7 +587,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> Duration duration = expiry.getExpiryForAccess(); if (duration != null) - cached.updateTtl(null, GridCacheUtils.toTtl(duration)); + cached.updateTtl(null, CU.toTtl(duration)); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java new file mode 100644 index 0000000..5fb8f97 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java @@ -0,0 +1,166 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.distributed; + +import org.apache.ignite.marshaller.optimized.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; +import java.io.*; +import java.util.concurrent.*; + +/** + * Externalizable wrapper for {@link ExpiryPolicy}. + */ +public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable, IgniteOptimizedMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + private static Object GG_CLASS_ID; + + /** */ + private ExpiryPolicy plc; + + /** */ + private static final byte CREATE_TTL_MASK = 0x01; + + /** */ + private static final byte UPDATE_TTL_MASK = 0x02; + + /** */ + private static final byte ACCESS_TTL_MASK = 0x04; + + /** */ + private Duration forCreate; + + /** */ + private Duration forUpdate; + + /** */ + private Duration forAccess; + + /** + * Required by {@link Externalizable}. + */ + public IgniteExternalizableExpiryPolicy() { + // No-op. + } + + /** + * @param plc Expiry policy. + */ + public IgniteExternalizableExpiryPolicy(ExpiryPolicy plc) { + assert plc != null; + + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return forCreate; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return forAccess; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return forUpdate; + } + + /** + * @param out Output stream. + * @param duration Duration. + * @throws IOException If failed. + */ + private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { + if (duration != null) { + if (duration.isEternal()) + out.writeLong(0); + else if (duration.getDurationAmount() == 0) + out.writeLong(1); + else + out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); + } + } + + /** + * @param in Input stream. + * @return Duration. + * @throws IOException If failed. + */ + private Duration readDuration(ObjectInput in) throws IOException { + long ttl = in.readLong(); + + assert ttl >= 0; + + if (ttl == 0) + return Duration.ETERNAL; + + return new Duration(TimeUnit.MILLISECONDS, ttl); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte flags = 0; + + Duration create = plc.getExpiryForCreation(); + + if (create != null) + flags |= CREATE_TTL_MASK; + + Duration update = plc.getExpiryForUpdate(); + + if (update != null) + flags |= UPDATE_TTL_MASK; + + Duration access = plc.getExpiryForAccess(); + + if (access != null) + flags |= ACCESS_TTL_MASK; + + out.writeByte(flags); + + writeDuration(out, create); + + writeDuration(out, update); + + writeDuration(out, access); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + byte flags = in.readByte(); + + if ((flags & CREATE_TTL_MASK) != 0) + forCreate = readDuration(in); + + if ((flags & UPDATE_TTL_MASK) != 0) + forUpdate = readDuration(in); + + if ((flags & ACCESS_TTL_MASK) != 0) + forAccess = readDuration(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteExternalizableExpiryPolicy.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 41e56c5..3a88e16 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -428,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], org.gridgain.grid.kernal.processors.cache.GridCacheExpiryPolicy)} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], org.gridgain.grid.kernal.processors.cache.IgniteCacheExpiryPolicy)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} @@ -483,7 +483,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry ) { return getAllAsync(keys, null, @@ -518,7 +518,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap int taskNameHash, boolean deserializePortable, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheExpiryPolicy expiry) { + @Nullable IgniteCacheExpiryPolicy expiry) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, @@ -546,7 +546,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long ttl = req.accessTtl(); - final GridCacheAccessExpiryPolicy expiryPlc = ttl == -1L ? null : new GridCacheAccessExpiryPolicy(ttl); + final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new GetExpiryPolicy(ttl); IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, @@ -599,7 +599,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param expiryPlc Expiry policy. */ - public void sendTtlUpdateRequest(@Nullable final GridCacheExpiryPolicy expiryPlc) { + public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy expiryPlc) { if (expiryPlc != null && expiryPlc.entries() != null) { ctx.closures().runLocalSafe(new Runnable() { @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) @@ -636,7 +636,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrs = expiryPlc.readers(); if (rdrs != null) { - assert !rdrs.isEmpty(); + assert !rdrs.isEmpty(); for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> e : rdrs.entrySet()) { ClusterNode node = ctx.node(e.getKey()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java index c2963b1..0920d5b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -85,7 +85,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private boolean deserializePortable; /** Expiry policy. */ - private GridCacheExpiryPolicy expiryPlc; + private IgniteCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -120,7 +120,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, - @Nullable GridCacheExpiryPolicy expiryPlc) { + @Nullable IgniteCacheExpiryPolicy expiryPlc) { super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer()); assert reader != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 8196fec..ecb55cc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -79,7 +79,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest /** TTLs for optimistic transaction. */ private GridLongList ttls; - /** TTLs for optimistic transaction. */ + /** Near cache TTLs for optimistic transaction. */ private GridLongList nearTtls; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8913c28..d20a4a3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -126,10 +126,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @param msgId Message ID. * @param cached Cached entry. * @param entry Transaction entry. + * @param topVer Topology version. * @return {@code True} if reader was added as a result of this call. */ - @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId, GridDhtCacheEntry<K, V> cached, - IgniteTxEntry<K, V> entry, long topVer); + @Nullable protected abstract IgniteFuture<Boolean> addReader(long msgId, + GridDhtCacheEntry<K, V> cached, + IgniteTxEntry<K, V> entry, + long topVer); /** * @param commit Commit flag. @@ -520,7 +523,15 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K IgniteTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++); - txEntry = addEntry(read ? READ : NOOP, null, null, cached, null, CU.<K, V>empty(), false, -1L, -1L, + txEntry = addEntry(NOOP, + null, + null, + cached, + null, + CU.<K, V>empty(), + false, + -1L, + -1L, drVers != null ? drVers[drVerIdx++] : null); if (w != null) { @@ -572,6 +583,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K } /** + * @param cacheCtx Context. * @param ret Return value. * @param passedKeys Passed keys. * @param read {@code True} if read. @@ -583,7 +595,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K final GridCacheContext<K, V> cacheCtx, GridCacheReturn<V> ret, final Collection<? extends K> passedKeys, - boolean read, + final boolean read, final Set<K> skipped, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (log.isDebugEnabled()) @@ -596,7 +608,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); IgniteFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, - lockTimeout(), this, isInvalidate(), read, /*retval*/false, isolation, CU.<K, V>empty()); + lockTimeout(), + this, + isInvalidate(), + read, + /*retval*/false, + isolation, + CU.<K, V>empty()); return new GridEmbeddedFuture<>( fut, @@ -605,7 +623,15 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K if (log.isDebugEnabled()) log.debug("Acquired transaction lock on keys: " + passedKeys); - postLockWrite(cacheCtx, passedKeys, skipped, null, null, ret, /*remove*/false, /*retval*/false, + postLockWrite(cacheCtx, + passedKeys, + skipped, + null, + null, + ret, + /*remove*/false, + /*retval*/false, + /*read*/true, filter == null ? CU.<K, V>empty() : filter); return ret; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java index 5a179eb..a545bc4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -273,6 +273,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> } /** + * @param cacheCtx Cache context. * @param op Write operation. * @param key Key to add to write set. * @param keyBytes Key bytes. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 942d0c5..3942309 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -89,7 +89,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private boolean deserializePortable; /** Expiry policy. */ - private GridCacheExpiryPolicy expiryPlc; + private IgniteCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -104,7 +104,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param topVer Topology version. * @param reload Reload flag. * @param forcePrimary If {@code true} then will force network trip to primary node even - * if called on backup node. + * if called on backup node. * @param filters Filters. * @param subjId Subject ID. * @param taskName Task name. @@ -121,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable GridCacheExpiryPolicy expiryPlc + @Nullable IgniteCacheExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d43cbdc..65099ef 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -107,8 +107,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override protected void init() { map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, - V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + @Override + public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash, + V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); @@ -197,7 +198,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } ctx.io().addDisconnectListener(new GridDisconnectListener() { - @Override public void onNodeDisconnected(UUID nodeId) { + @Override + public void onNodeDisconnected(UUID nodeId) { scheduleAtomicFutureRecheck(); } }); @@ -271,7 +273,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; return asyncOp(new CO<IgniteFuture<Map<K, V>>>() { - @Override public IgniteFuture<Map<K, V>> apply() { + @Override + public IgniteFuture<Map<K, V>> apply() { return getAllAsync0(keys, false, forcePrimary, @@ -691,7 +694,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskNameHash); return asyncOp(new CO<IgniteFuture<Object>>() { - @Override public IgniteFuture<Object> apply() { + @Override + public IgniteFuture<Object> apply() { updateFut.map(); return updateFut; @@ -730,8 +734,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long topVer = ctx.affinity().affinityTopologyVersion(); - final GridCacheAccessExpiryPolicy expiry = - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = + GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { @@ -894,7 +898,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - GridCacheExpiryPolicy expiry = null; + IgniteCacheExpiryPolicy expiry = null; try { // If batch store update is enabled, we need to lock all entries. @@ -944,7 +948,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate = ctx.isDrEnabled(); - expiry = expiryPolicy(req.expiry() != null ? req.expiry() : ctx.expiry()); + ExpiryPolicy plc = req.expiry() != null ? req.expiry() : ctx.expiry(); + + if (plc != null) + expiry = new UpdateExpiryPolicy(plc); if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) { // This method can only be used when there are no replicated entries in the batch. @@ -1069,7 +1076,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, boolean replicate, String taskName, - @Nullable GridCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue(); // Should not request return values for putAll. @@ -1112,7 +1119,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ttl != -1L) { entry.updateTtl(null, ttl); - expiry.onAccessUpdated(entry.key(), + expiry.ttlUpdated(entry.key(), entry.getOrMarshalKeyBytes(), entry.version(), entry.readers()); @@ -1423,7 +1430,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, boolean replicate, String taskName, - @Nullable GridCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { GridCacheReturn<Object> retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null; @@ -1563,12 +1570,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newValBytes = null; // If put the same value as in request then do not need to send it back. - if (op == TRANSFORM || writeVal != updRes.newValue()) + if (op == TRANSFORM || writeVal != updRes.newValue()) { res.addNearValue(i, updRes.newValue(), newValBytes, ttl, expireTime); + } else res.addNearTtl(i, ttl, expireTime); @@ -1630,6 +1638,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param res Response. * @param replicate Whether replication is enabled. * @param batchRes Batch update result. + * @param taskName Task name. * @param expiry Expiry policy. * @return Deleted entries. */ @@ -1650,7 +1659,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate, UpdateBatchResult<K, V> batchRes, String taskName, - @Nullable GridCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry ) { assert putMap == null ^ rmvKeys == null; @@ -2230,11 +2239,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { UPDATE : DELETE; - long ttl = req.drTtl(i); + long ttl = req.ttl(i); long expireTime = req.drExpireTime(i); if (ttl != -1L && expireTime == -1L) - expireTime = GridCacheMapEntry.toExpireTime(ttl); + expireTime = CU.toExpireTime(ttl); GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( ver, @@ -2245,7 +2254,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { valBytes, /*write-through*/false, /*retval*/false, - null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, @@ -2415,14 +2424,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - /** - * @param plc Expiry policy. - * @return Expiry policy wrapper. - */ - private static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { - return plc == null ? null : new UpdateExpiryPolicy(plc); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicCache.class, this, super.toString()); @@ -2704,7 +2705,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * */ - private static class UpdateExpiryPolicy implements GridCacheExpiryPolicy { + private static class UpdateExpiryPolicy implements IgniteCacheExpiryPolicy { /** */ private final ExpiryPolicy plc; @@ -2739,10 +2740,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void onAccessUpdated(Object key, - byte[] keyBytes, - GridCacheVersion ver, - @Nullable Collection<UUID> rdrs) { + @Override public void ttlUpdated(Object key, + byte[] keyBytes, + GridCacheVersion ver, + @Nullable Collection<UUID> rdrs) { if (entries == null) entries = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 25bc875..8602ae3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -23,7 +22,6 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; -import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -199,7 +197,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param entry Entry to map. * @param val Value to write. * @param valBytes Value bytes. - * @param drTtl DR TTL (optional). + * @param transformC Transform closure. + * @param ttl TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). */ @@ -207,7 +206,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> @Nullable V val, @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, - long drTtl, + long ttl, long drExpireTime, @Nullable GridCacheVersion drVer) { long topVer = updateReq.topologyVersion(); @@ -247,7 +246,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> val, valBytes, transformC, - drTtl, + ttl, drExpireTime, drVer); } @@ -259,7 +258,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param entry Entry. * @param val Value. * @param valBytes Value bytes. - * @param TTL for near cache update (optional). + * @param transformC Transform closure. + * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable<UUID> readers, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 0d44e03..2b68734 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -20,7 +20,6 @@ import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; -import javax.cache.expiry.*; import java.io.*; import java.nio.*; import java.util.*; @@ -71,10 +70,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> drVers; - /** DR TTLs. */ - private GridLongList drTtls; + /** TTLs. */ + private GridLongList ttls; - /** DR TTLs. */ + /** DR expire time. */ private GridLongList drExpireTimes; /** Near TTLs. */ @@ -156,6 +155,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param topVer Topology version. * @param forceTransformBackups Force transform backups flag. * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridDhtAtomicUpdateRequest( int cacheId, @@ -204,7 +204,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. * @param transformC Transform closure. - * @param drTtl DR TTL (optional). + * @param ttl TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). */ @@ -213,7 +213,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @Nullable V val, @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, - long drTtl, + long ttl, long drExpireTime, @Nullable GridCacheVersion drVer) { keys.add(key); @@ -240,17 +240,18 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp else if (drVers != null) drVers.add(drVer); - if (drTtl >= 0) { - if (drTtls == null) { - drTtls = new GridLongList(keys.size()); + if (ttl >= 0) { + if (ttls == null) { + ttls = new GridLongList(keys.size()); for (int i = 0; i < keys.size() - 1; i++) - drTtls.add(-1); + ttls.add(-1); } - - drTtls.add(drTtl); } + if (ttls != null) + ttls.add(ttl); + if (drExpireTime >= 0) { if (drExpireTimes == null) { drExpireTimes = new GridLongList(keys.size()); @@ -258,9 +259,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp for (int i = 0; i < keys.size() - 1; i++) drExpireTimes.add(-1); } + } + if (drExpireTimes != null) drExpireTimes.add(drExpireTime); - } } /** @@ -278,7 +280,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, long ttl, - long expireTime) { + long expireTime) + { if (nearKeys == null) { nearKeys = new ArrayList<>(); nearKeyBytes = new ArrayList<>(); @@ -313,9 +316,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp for (int i = 0; i < nearKeys.size() - 1; i++) nearTtls.add(-1); } + } + if (nearTtls != null) nearTtls.add(ttl); - } if (expireTime >= 0) { if (nearExpireTimes == null) { @@ -324,9 +328,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp for (int i = 0; i < nearKeys.size() - 1; i++) nearExpireTimes.add(-1); } + } + if (nearExpireTimes != null) nearExpireTimes.add(expireTime); - } } /** {@inheritDoc} */ @@ -548,21 +553,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp } /** - * @return DR TTLs. - */ - @Nullable public GridLongList drTtls() { - return drTtls; - } - - /** * @param idx Index. - * @return DR TTL. + * @return TTL. */ - public long drTtl(int idx) { - if (drTtls != null) { - assert idx >= 0 && idx < drTtls.size(); + public long ttl(int idx) { + if (ttls != null) { + assert idx >= 0 && idx < ttls.size(); - return drTtls.get(idx); + return ttls.get(idx); } return -1L; @@ -677,7 +675,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp _clone.vals = vals; _clone.valBytes = valBytes; _clone.drVers = drVers; - _clone.drTtls = drTtls; + _clone.ttls = ttls; _clone.drExpireTimes = drExpireTimes; _clone.syncMode = syncMode; _clone.nearKeys = nearKeys; @@ -718,7 +716,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp commState.idx++; case 4: - if (!commState.putLongList(drTtls)) + if (!commState.putLongList(ttls)) return false; commState.idx++; @@ -1001,7 +999,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp if (drTtls0 == LONG_LIST_NOT_READ) return false; - drTtls = drTtls0; + ttls = drTtls0; commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 4750462..6b233e7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -163,8 +163,10 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. * @param cached Cached entry if keys size is 1. - * @param expiryPlc Expiry policy. + * @param expiryPlc Expiry policy explicitly specified for cache operation. * @param filter Entry filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridNearAtomicUpdateFuture( GridCacheContext<K, V> cctx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 1265edb..fde03bd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -492,7 +492,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im filterBytes = marshalFilter(filter, ctx); if (expiryPlc != null) - expiryPlcBytes = CU.marshal(ctx, new GridCacheExternalizableExpiryPolicy(expiryPlc)); + expiryPlcBytes = CU.marshal(ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index d1e93c9..7534fd5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -243,8 +243,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKeys(keys); - final GridCacheAccessExpiryPolicy expiry = - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = + GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index 77e18cf..fa19607 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -180,6 +180,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param ttl TTL. * @param expireTime Expire time. * @param nodeId Node ID. + * @param subjId Subject ID. + * @param taskName Task name. * @throws IgniteCheckedException If failed. */ private void processNearAtomicUpdateResponse( @@ -213,7 +215,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { valBytes, /*write-through*/false, /*retval*/false, - null, + /**expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index 2df7506..83ac913 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -279,8 +279,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null; - final GridCacheAccessExpiryPolicy expiry = - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = + GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index 63fde7e..62da5e4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -92,7 +92,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private boolean deserializePortable; /** Expiry policy. */ - private GridCacheExpiryPolicy expiryPlc; + private IgniteCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -124,7 +124,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable GridCacheExpiryPolicy expiryPlc + @Nullable IgniteCacheExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index 17b97b7..f09d5c2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -129,6 +129,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param tx Transaction. * @param keys Keys to load. * @param filter Filter. + * @param deserializePortable Deserialize portable flag. * @return Future. */ IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 75ddc95..88b6cfc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -562,6 +562,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param swapOrOffheap {@code True} if swap of off-heap storage are enabled. * @param storeEnabled Store enabled flag. * @param clone {@code True} if returned values should be cloned. + * @param taskName Task name. + * @param deserializePortable Deserialize portable . * @return Key-value map. * @throws IgniteCheckedException If failed. */ @@ -592,8 +594,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(keys); - final GridCacheAccessExpiryPolicy expiry = - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = + GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); boolean success = true; @@ -871,9 +873,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param op Operation. * @param keys Keys. * @param vals Values. + * @param expiryPlc Expiry policy. * @param ver Cache version. * @param filter Optional filter. * @param subjId Subject ID. + * @param taskName Task name. * @throws GridCachePartialUpdateException If update failed. */ @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"}) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783e5270/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java index 0cec41d..68e4b93 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java @@ -134,7 +134,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, /** Expiry policy. */ private ExpiryPolicy expiryPlc; - /** */ + /** Expiry policy transfer flag. */ private boolean transferExpiryPlc; /** @@ -730,17 +730,11 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** - * Marks expiry policy for transfer if it explicitly set and differs from default one. - */ - public void transferExpiryPolicyIfNeeded() { - transferExpiryPlc = expiryPlc != null && expiryPlc != ctx.expiry(); - } - - /** * @param ctx Context. + * @param transferExpiry {@code True} if expire policy should be marshalled. * @throws IgniteCheckedException If failed. */ - public void marshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + public void marshal(GridCacheSharedContext<K, V> ctx, boolean transferExpiry) throws IgniteCheckedException { // Do not serialize filters if they are null. if (depEnabled) { if (keyBytes == null) @@ -755,6 +749,9 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, filterBytes = CU.marshal(ctx, filters); } + if (transferExpiry) + transferExpiryPlc = expiryPlc != null && expiryPlc != this.ctx.expiry(); + val.marshal(ctx, context(), depEnabled); } @@ -837,7 +834,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, out.writeBoolean(grpLock); CU.writeVersion(out, drVer); - out.writeObject(transferExpiryPlc ? new GridCacheExternalizableExpiryPolicy(expiryPlc) : null); + out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null); } /** {@inheritDoc} */