http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 1dae49e..99f5128 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -52,8 +52,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<K, Object>{ +public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implements GridCacheAtomicFuture<Object> { /** */ private static final long serialVersionUID = 0L; @@ -64,10 +63,10 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> protected static IgniteLogger log; /** Cache context. */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext cctx; /** Cache. */ - private GridDhtAtomicCache<K, V> cache; + private GridDhtAtomicCache cache; /** Future ID. */ private volatile GridCacheVersion futVer; @@ -76,7 +75,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> private final GridCacheOperation op; /** Keys */ - private Collection<? extends K> keys; + private Collection<Object> keys; /** Values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -87,7 +86,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheDrInfo<V>> conflictPutVals; + private Collection<GridCacheDrInfo<Object>> conflictPutVals; /** Conflict remove values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -95,7 +94,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private final ConcurrentMap<UUID, GridNearAtomicUpdateRequest<K, V>> mappings; + private final ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -106,9 +105,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Return value require flag. */ private final boolean retval; - /** Cached entry if keys size is 1. */ - private GridCacheEntryEx cached; - /** Expiry policy. */ private final ExpiryPolicy expiryPlc; @@ -116,7 +112,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> private long topVer; /** Optional filter. */ - private final IgnitePredicate<Cache.Entry<K, V>>[] filter; + private final IgnitePredicate<Cache.Entry<Object, Object>>[] filter; /** Write synchronization mode. */ private final CacheWriteSynchronizationMode syncMode; @@ -128,7 +124,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> private UUID singleNodeId; /** Single update request. */ - private GridNearAtomicUpdateRequest<K, V> singleReq; + private GridNearAtomicUpdateRequest singleReq; /** Raw return value flag. */ private boolean rawRetval; @@ -178,27 +174,25 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param conflictRmvVals Conflict remove values (optional). * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. - * @param cached Cached entry if keys size is 1. * @param expiryPlc Expiry policy explicitly specified for cache operation. * @param filter Entry filter. * @param subjId Subject ID. * @param taskNameHash Task name hash code. */ public GridNearAtomicUpdateFuture( - GridCacheContext<K, V> cctx, - GridDhtAtomicCache<K, V> cache, + GridCacheContext cctx, + GridDhtAtomicCache cache, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - Collection<? extends K> keys, + Collection<Object> keys, @Nullable Collection<?> vals, @Nullable Object[] invokeArgs, - @Nullable Collection<GridCacheDrInfo<V>> conflictPutVals, + @Nullable Collection<GridCacheDrInfo<Object>> conflictPutVals, @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final boolean rawRetval, - @Nullable GridCacheEntryEx cached, @Nullable ExpiryPolicy expiryPlc, - final IgnitePredicate<Cache.Entry<K, V>>[] filter, + final IgnitePredicate<Cache.Entry<Object, Object>>[] filter, UUID subjId, int taskNameHash ) { @@ -209,7 +203,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); - assert cached == null || keys.size() == 1; assert subjId != null; this.cctx = cctx; @@ -222,7 +215,6 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; this.retval = retval; - this.cached = cached; this.expiryPlc = expiryPlc; this.filter = filter; this.subjId = subjId; @@ -266,7 +258,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } /** {@inheritDoc} */ - @Override public Collection<? extends K> keys() { + @Override public Collection<Object> keys() { return keys; } @@ -286,7 +278,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> return false; } - GridNearAtomicUpdateRequest<K, V> req = mappings.get(nodeId); + GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + @@ -354,10 +346,11 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param nodeId Node ID. * @param res Update response. */ - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) { + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; + // TODO IGNITE-51. mapOnTopology(res.remapKeys(), true, nodeId); return; @@ -380,7 +373,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } } else { - GridNearAtomicUpdateRequest<K, V> req = mappings.get(nodeId); + GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); @@ -411,11 +404,11 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param req Update request. * @param res Update response. */ - private void updateNear(GridNearAtomicUpdateRequest<K, V> req, GridNearAtomicUpdateResponse<K, V> res) { + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { if (!nearEnabled || !req.hasPrimary()) return; - GridNearAtomicCache<K, V> near = (GridNearAtomicCache<K, V>)cctx.dht().near(); + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); near.processNearAtomicUpdateResponse(req, res); } @@ -427,7 +420,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param remap Boolean flag indicating if this is partial future remap. * @param oldNodeId Old node ID if remap. */ - private void mapOnTopology(final Collection<? extends K> keys, final boolean remap, final UUID oldNodeId) { + private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) { cache.topology().readLock(); GridDiscoveryTopologySnapshot snapshot = null; @@ -493,7 +486,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param oldNodeId Old node ID if was remap. */ private void map0(GridDiscoveryTopologySnapshot topSnapshot, - Collection<? extends K> keys, + Collection<?> keys, boolean remap, @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; @@ -517,7 +510,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> log.debug("Assigned fast-map version for update on near node: " + updVer); if (keys.size() == 1 && !fastMap && (single == null || single)) { - K key = F.first(keys); + Object key = F.first(keys); Object val; GridCacheVersion conflictVer; @@ -533,7 +526,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } else if (conflictPutVals != null) { // Conflict PUT. - GridCacheDrInfo<V> conflictPutVal = F.first(conflictPutVals); + GridCacheDrInfo<Object> conflictPutVal = F.first(conflictPutVals); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); @@ -572,21 +565,19 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> throw err; } - if (cctx.portableEnabled()) { - key = (K)cctx.marshalToPortable(key); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - if (op != TRANSFORM) - val = cctx.marshalToPortable(val); - } + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - Collection<ClusterNode> primaryNodes = mapKey(key, topVer, fastMap); + Collection<ClusterNode> primaryNodes = mapKey(cacheKey, topVer, fastMap); // One key and no backups. assert primaryNodes.size() == 1 : "Should be mapped to single node: " + primaryNodes; ClusterNode primary = F.first(primaryNodes); - GridNearAtomicUpdateRequest<K, V> req = new GridNearAtomicUpdateRequest<>( + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( cctx.cacheId(), primary.id(), futVer, @@ -603,7 +594,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> subjId, taskNameHash); - req.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, true); + req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, true); single = true; @@ -618,7 +609,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (vals != null) it = vals.iterator(); - Iterator<GridCacheDrInfo<V>> conflictPutValsIt = null; + Iterator<GridCacheDrInfo<Object>> conflictPutValsIt = null; if (conflictPutVals != null) conflictPutValsIt = conflictPutVals.iterator(); @@ -628,7 +619,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. @@ -637,7 +628,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> removeMapping(oldNodeId); // Create mappings first, then send messages. - for (K key : keys) { + for (Object key : keys) { if (key == null) { NullPointerException err = new NullPointerException("Null key."); @@ -666,7 +657,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } } else if (conflictPutVals != null) { - GridCacheDrInfo<V> conflictPutVal = conflictPutValsIt.next(); + GridCacheDrInfo<Object> conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); @@ -689,24 +680,22 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (val == null && op != GridCacheOperation.DELETE) continue; - if (cctx.portableEnabled()) { - key = (K)cctx.marshalToPortable(key); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - if (op != TRANSFORM) - val = cctx.marshalToPortable(val); - } + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(key, topVer, fastMap); + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); int i = 0; for (ClusterNode affNode : affNodes) { UUID nodeId = affNode.id(); - GridNearAtomicUpdateRequest<K, V> mapped = pendingMappings.get(nodeId); + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest<>( + mapped = new GridNearAtomicUpdateRequest( cctx.cacheId(), nodeId, futVer, @@ -725,13 +714,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> pendingMappings.put(nodeId, mapped); - GridNearAtomicUpdateRequest<K, V> old = mappings.put(nodeId, mapped); + GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped); assert old == null || (old != null && remap) : "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; } - mapped.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); i++; } @@ -739,7 +728,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry<UUID, GridNearAtomicUpdateRequest<K, V>> entry = F.first(pendingMappings.entrySet()); + Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); single = true; @@ -762,8 +751,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private Collection<ClusterNode> mapKey(K key, long topVer, boolean fastMap) { - GridCacheAffinityManager<K, V> affMgr = cctx.affinity(); + private Collection<ClusterNode> mapKey(KeyCacheObject key, long topVer, boolean fastMap) { + GridCacheAffinityManager affMgr = cctx.affinity(); // If we can send updates in parallel - do it. return fastMap ? @@ -777,15 +766,15 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param nodeId Node ID. * @param req Request. */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest<K, V> req) { + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { singleNodeId = nodeId; singleReq = req; if (ctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>>() { - @Override public void apply(GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res) { + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { assert res.futureVersion().equals(futVer); onResult(res.nodeId(), res); @@ -800,7 +789,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) - onDone(new GridCacheReturn<V>(null, true)); + onDone(new GridCacheReturn<Object>(null, true)); } catch (IgniteCheckedException e) { onDone(addFailedKeys(req.keys(), e)); @@ -813,13 +802,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * * @param mappings Mappings to send. */ - private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest<K, V>> mappings) { + private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { UUID locNodeId = cctx.localNodeId(); - GridNearAtomicUpdateRequest<K, V> locUpdate = null; + GridNearAtomicUpdateRequest locUpdate = null; // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest<K, V> req : mappings.values()) { + for (GridNearAtomicUpdateRequest req : mappings.values()) { if (locNodeId.equals(req.nodeId())) { assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + ", req=" + req + ']'; @@ -850,9 +839,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>>() { - @Override public void apply(GridNearAtomicUpdateRequest<K, V> req, - GridNearAtomicUpdateResponse<K, V> res) { + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { assert res.futureVersion().equals(futVer); onResult(res.nodeId(), res); @@ -895,12 +884,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param err Error cause. * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}. */ - private synchronized IgniteCheckedException addFailedKeys(Collection<K> failedKeys, Throwable err) { + private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, Throwable err) { CachePartialUpdateCheckedException err0 = this.err; if (err0 == null) err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + // TODO IGNITE-51. err0.add(failedKeys, err); return err0;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java index b19a2da..c95e2e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; /** * Cache entry for colocated cache. */ -public class GridDhtColocatedCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { +public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry { /** * @param ctx Cache context. * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). @@ -35,8 +35,15 @@ public class GridDhtColocatedCacheEntry<K, V> extends GridDhtCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridDhtColocatedCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtColocatedCacheEntry(GridCacheContext ctx, + long topVer, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, topVer, key, hash, val, next, ttl, hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 549e552..944034c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -27,7 +27,7 @@ import org.jetbrains.annotations.*; /** * Detached cache entry. */ -public class GridDhtDetachedCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { +public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { /** * @param ctx Cache context. * @param key Cache key. @@ -37,8 +37,8 @@ public class GridDhtDetachedCacheEntry<K, V> extends GridDistributedCacheEntry<K * @param ttl Time to live. * @param hdrId Header ID. */ - public GridDhtDetachedCacheEntry(GridCacheContext<K, V> ctx, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key, int hash, CacheObject val, + GridCacheMapEntry next, long ttl, int hdrId) { super(ctx, key, hash, val, next, ttl, hdrId); } @@ -50,7 +50,7 @@ public class GridDhtDetachedCacheEntry<K, V> extends GridDistributedCacheEntry<K * @param ver Version. * @throws IgniteCheckedException If value unmarshalling failed. */ - public void resetFromPrimary(V val, byte[] valBytes, GridCacheVersion ver) + public void resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver) throws IgniteCheckedException { if (valBytes != null && val == null) val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); @@ -61,30 +61,31 @@ public class GridDhtDetachedCacheEntry<K, V> extends GridDistributedCacheEntry<K } /** {@inheritDoc} */ - @Nullable @Override public V unswap(boolean ignoreFlags, boolean needVal) throws IgniteCheckedException { + @Nullable @Override public CacheObject unswap(boolean ignoreFlags, boolean needVal) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override protected void value(@Nullable V val, @Nullable byte[] valBytes) { + @Override protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) { this.val = val; - this.valBytes = valBytes; } /** {@inheritDoc} */ @Override protected GridCacheValueBytes valueBytesUnlocked() { - return (val != null && val instanceof byte[]) ? GridCacheValueBytes.plain(val) : - valBytes == null ? GridCacheValueBytes.nil() : GridCacheValueBytes.marshaled(valBytes); + return null; +// TODO IGNITE-51. +// return (val != null && val instanceof byte[]) ? GridCacheValueBytes.plain(val) : +// valBytes == null ? GridCacheValueBytes.nil() : GridCacheValueBytes.marshaled(valBytes); } /** {@inheritDoc} */ - @Override protected void updateIndex(V val, byte[] valBytes, long expireTime, - GridCacheVersion ver, V old) throws IgniteCheckedException { + @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime, + GridCacheVersion ver, CacheObject old) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary nodes. } /** {@inheritDoc} */ - @Override protected void clearIndex(V val) throws IgniteCheckedException { + @Override protected void clearIndex(CacheObject val) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary or backup nodes. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 306d929..2642c5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -282,7 +282,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda if (keyCheck) validateCacheKeys(keys); - IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null; + IgniteTxLocalEx txx = (tx != null && tx.local()) ? (IgniteTxLocalEx)tx : null; final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index ce05f76..52d5010 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -39,7 +39,7 @@ import static org.apache.ignite.events.EventType.*; * Near cache entry. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) -public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { +public class GridNearCacheEntry extends GridDistributedCacheEntry { /** */ private static final int NEAR_SIZE_OVERHEAD = 36; @@ -62,8 +62,14 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridNearCacheEntry(GridCacheContext<K, V> ctx, K key, int hash, V val, GridCacheMapEntry<K, V> next, - long ttl, int hdrId) { + public GridNearCacheEntry(GridCacheContext ctx, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, key, hash, val, next, ttl, hdrId); part = ctx.affinity().partition(key); @@ -118,10 +124,10 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { */ public boolean initializeFromDht(long topVer) throws GridCacheEntryRemovedException { while (true) { - GridDhtCacheEntry<K, V> entry = cctx.near().dht().peekExx(key); + GridDhtCacheEntry entry = cctx.near().dht().peekExx(key); if (entry != null) { - GridCacheEntryInfo<K, V> e = entry.info(); + GridCacheEntryInfo e = entry.info(); if (e != null) { GridCacheVersion enqueueVer = null; @@ -132,10 +138,10 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { if (isNew() || !valid(topVer)) { // Version does not change for load ops. - update(e.value(), e.valueBytes(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); + update(e.value(), null, e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); if (cctx.deferredDelete()) { - boolean deleted = val == null && valBytes == null; + boolean deleted = val == null; if (deleted != deletedUnlocked()) { deletedUnlocked(deleted); @@ -179,18 +185,19 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws IgniteCheckedException If failed. */ @SuppressWarnings( {"RedundantTypeArguments"}) - public boolean resetFromPrimary(V val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, + public boolean resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer, UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException { assert dhtVer != null; cctx.versions().onReceived(primaryNodeId, dhtVer); - if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) { - GridCacheVersion curDhtVer = dhtVersion(); - - if (!F.eq(dhtVer, curDhtVer)) - val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); - } +// TODO IGNITE-51. +// if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) { +// GridCacheVersion curDhtVer = dhtVersion(); +// +// if (!F.eq(dhtVer, curDhtVer)) +// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); +// } synchronized (this) { checkObsolete(); @@ -220,8 +227,13 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @param ttl Time to live. * @param primaryNodeId Primary node ID. */ - public void updateOrEvict(GridCacheVersion dhtVer, @Nullable V val, @Nullable byte[] valBytes, long expireTime, - long ttl, UUID primaryNodeId) { + public void updateOrEvict(GridCacheVersion dhtVer, + @Nullable CacheObject val, + @Nullable byte[] valBytes, + long expireTime, + long ttl, + UUID primaryNodeId) + { assert dhtVer != null; cctx.versions().onReceived(primaryNodeId, dhtVer); @@ -261,26 +273,27 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @return Tuple with version and value of this entry. * @throws GridCacheEntryRemovedException If entry has been removed. */ - @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue() + @Nullable public synchronized GridTuple3<GridCacheVersion, CacheObject, byte[]> versionedValue() throws GridCacheEntryRemovedException { checkObsolete(); if (dhtVer == null) return null; else { - V val0 = null; + CacheObject val0 = null; byte[] valBytes0 = null; - GridCacheValueBytes valBytesTuple = valueBytes(); - - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); - } - else - val0 = val; +// TODO IGNITE-51. +// GridCacheValueBytes valBytesTuple = valueBytes(); +// +// if (!valBytesTuple.isNull()) { +// if (valBytesTuple.isPlain()) +// val0 = (V)valBytesTuple.get(); +// else +// valBytes0 = valBytesTuple.get(); +// } +// else +// val0 = val; return F.t(dhtVer, val0, valBytes0); } @@ -312,17 +325,19 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { } /** {@inheritDoc} */ - @Override protected V readThrough(IgniteInternalTx tx, K key, boolean reload, + @Override protected Object readThrough(IgniteInternalTx tx, KeyCacheObject key, boolean reload, UUID subjId, String taskName) throws IgniteCheckedException { - return cctx.near().loadAsync(tx, - F.asList(key), - reload, - /*force primary*/false, - subjId, - taskName, - true, - null, - false).get().get(key); + return null. +// TODO IGNTIE-51. +// return cctx.near().loadAsync(tx, +// F.asList(key), +// reload, +// /*force primary*/false, +// subjId, +// taskName, +// true, +// null, +// false).get().get(key); } /** @@ -343,14 +358,24 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * @throws GridCacheEntryRemovedException If entry was removed. */ @SuppressWarnings({"RedundantTypeArguments"}) - public boolean loadedValue(@Nullable IgniteInternalTx tx, UUID primaryNodeId, V val, byte[] valBytes, - GridCacheVersion ver, GridCacheVersion dhtVer, @Nullable GridCacheVersion expVer, long ttl, long expireTime, - boolean evt, long topVer, UUID subjId) + public boolean loadedValue(@Nullable IgniteInternalTx tx, + UUID primaryNodeId, + CacheObject val, + byte[] valBytes, + GridCacheVersion ver, + GridCacheVersion dhtVer, + @Nullable GridCacheVersion expVer, + long ttl, + long expireTime, + boolean evt, + long topVer, + UUID subjId) throws IgniteCheckedException, GridCacheEntryRemovedException { boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); - if (valBytes != null && val == null && (isNewLocked() || !valid)) - val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); +// TODO IGNITE-51. +// if (valBytes != null && val == null && (isNewLocked() || !valid)) +// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); GridCacheVersion enqueueVer = null; @@ -363,7 +388,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { boolean ret = false; - V old = this.val; + CacheObject old = this.val; boolean hasVal = hasValueUnlocked(); if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) { @@ -404,13 +429,13 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { } /** {@inheritDoc} */ - @Override protected void updateIndex(V val, byte[] valBytes, long expireTime, - GridCacheVersion ver, V old) throws IgniteCheckedException { + @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime, + GridCacheVersion ver, CacheObject old) throws IgniteCheckedException { // No-op: queries are disabled for near cache. } /** {@inheritDoc} */ - @Override protected void clearIndex(V val) { + @Override protected void clearIndex(CacheObject val) { // No-op. } @@ -459,7 +484,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { GridCacheMvccCandidate owner; GridCacheMvccCandidate cand; - V val; + CacheObject val; UUID locId = cctx.nodeId(); @@ -469,7 +494,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { GridCacheMvcc mvcc = mvccExtras(); if (mvcc == null) { - mvcc = new GridCacheMvcc<>(cctx); + mvcc = new GridCacheMvcc(cctx); mvccExtras(mvcc); } @@ -539,7 +564,7 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { GridCacheMvccCandidate prev = null; GridCacheMvccCandidate owner = null; - V val; + CacheObject val; UUID locId = cctx.nodeId(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index a281cfc..db8038d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -28,7 +28,7 @@ import static org.apache.ignite.events.EventType.*; * Cache entry for local caches. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) -public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { +public class GridLocalCacheEntry extends GridCacheMapEntry { /** * @param ctx Cache registry. * @param key Cache key. @@ -38,8 +38,14 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { * @param ttl Time to live. * @param hdrId Header id. */ - public GridLocalCacheEntry(GridCacheContext<K, V> ctx, K key, int hash, V val, - GridCacheMapEntry<K, V> next, long ttl, int hdrId) { + public GridLocalCacheEntry(GridCacheContext ctx, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { super(ctx, key, hash, val, next, ttl, hdrId); } @@ -71,7 +77,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { GridCacheMvccCandidate cand; GridCacheMvccCandidate owner; - V val; + CacheObject val; boolean hasVal; synchronized (this) { @@ -80,7 +86,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { GridCacheMvcc mvcc = mvccExtras(); if (mvcc == null) { - mvcc = new GridCacheMvcc<>(cctx); + mvcc = new GridCacheMvcc(cctx); mvccExtras(mvcc); } @@ -254,8 +260,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { // Allow next lock in the thread to proceed. if (!cand.used()) { - GridLocalCacheEntry<K, V> e = - (GridLocalCacheEntry<K, V>)cctx.cache().peekEx(cand.key()); + GridLocalCacheEntry e = (GridLocalCacheEntry)cctx.cache().peekEx(cand.key()); // At this point candidate may have been removed and entry destroyed, // so we check for null. @@ -293,7 +298,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { GridCacheMvccCandidate prev = null; GridCacheMvccCandidate owner = null; - V val; + CacheObject val; boolean hasVal; synchronized (this) { @@ -342,7 +347,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { GridCacheMvccCandidate doomed; - V val; + CacheObject val; boolean hasVal; synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index a2d69fb..fdc4a7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -992,17 +992,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * * @param tx Committed transaction. */ - public <K, V> void onTxCommitted(IgniteInternalTx<K, V> tx) { + public <K, V> void onTxCommitted(IgniteInternalTx tx) { if (dsCacheCtx == null) return; if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) { - Collection<IgniteTxEntry<K, V>> entries = tx.writeEntries(); + Collection<IgniteTxEntry> entries = tx.writeEntries(); if (log.isDebugEnabled()) log.debug("Committed entries: " + entries); - for (IgniteTxEntry<K, V> entry : entries) { + for (IgniteTxEntry entry : entries) { // Check updated or created GridCacheInternalKey keys. if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key() instanceof GridCacheInternalKey) { GridCacheInternal key = (GridCacheInternal)entry.key(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b86bdec/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java index 5f7b826..00ee851 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java @@ -68,7 +68,7 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testMapEntry() throws Exception { - Map<Integer, GridCacheMapEntry<Integer, Integer>> map = new HashMap<>(5 * 1024 * 1024); + Map<Integer, GridCacheMapEntry> map = new HashMap<>(5 * 1024 * 1024); int i = 0; @@ -79,13 +79,13 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest { Integer key = i++; Integer val = i++; - map.put(key, new GridCacheMapEntry<Integer, Integer>(ctx, key, - key.hashCode(), val, null, 0, 1) { - @Override public boolean tmLock(IgniteInternalTx<Integer, Integer> tx, long timeout) { + map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key), + key.hashCode(), ctx.toCacheObject(val), null, 0, 1) { + @Override public boolean tmLock(IgniteInternalTx tx, long timeout) { return false; } - @Override public void txUnlock(IgniteInternalTx<Integer, Integer> tx) { + @Override public void txUnlock(IgniteInternalTx tx) { // No-op. }