#ignite-743: add StreamReceiver uses IgniteCacheProxyLockFree.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dbcb60a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dbcb60a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dbcb60a1 Branch: refs/heads/ignite-755 Commit: dbcb60a134d6f919d8b0e95c96a7730705fed75f Parents: b5d6303 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed Apr 15 11:47:21 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed Apr 15 11:47:21 2015 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerCacheUpdaters.java | 2 +- .../datastreamer/DataStreamerImpl.java | 83 +++++++++----------- 2 files changed, 39 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dbcb60a1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java index ed51730..fc6783d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java @@ -85,7 +85,7 @@ public class DataStreamerCacheUpdaters { // Here we assume that there are no key duplicates, so the following calls are valid. if (rmvCol != null) - ((IgniteCacheProxyLockFree<K, V>)cache).removeAll(rmvCol); + cache.removeAll(rmvCol); if (putMap != null) cache.putAll(putMap); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dbcb60a1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a03a925..83fb3cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1373,69 +1373,62 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@inheritDoc} */ @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { - IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; + IgniteCacheProxyLockFree<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxyLockFree<KeyCacheObject, CacheObject>)cache; - proxy.gate().enter(); + GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); - try { - GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); - - if (internalCache.isNear()) - internalCache = internalCache.context().near().dht(); + if (internalCache.isNear()) + internalCache = internalCache.context().near().dht(); - GridCacheContext cctx = internalCache.context(); + GridCacheContext cctx = internalCache.context(); - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - GridCacheVersion ver = cctx.versions().next(topVer); + GridCacheVersion ver = cctx.versions().next(topVer); - long ttl = CU.TTL_ETERNAL; - long expiryTime = CU.EXPIRE_TIME_ETERNAL; + long ttl = CU.TTL_ETERNAL; + long expiryTime = CU.EXPIRE_TIME_ETERNAL; - ExpiryPolicy plc = cctx.expiry(); + ExpiryPolicy plc = cctx.expiry(); - for (Entry<KeyCacheObject, CacheObject> e : entries) { - try { - e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + for (Entry<KeyCacheObject, CacheObject> e : entries) { + try { + e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); - GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); + GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); - entry.unswap(false); + entry.unswap(false); - if (plc != null) { - ttl = CU.toTtl(plc.getExpiryForCreation()); + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); - if (ttl == CU.TTL_ZERO) - continue; - else if (ttl == CU.TTL_NOT_CHANGED) - ttl = 0; + if (ttl == CU.TTL_ZERO) + continue; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; - expiryTime = CU.toExpireTime(ttl); - } + expiryTime = CU.toExpireTime(ttl); + } - entry.initialValue(e.getValue(), - ver, - ttl, - expiryTime, - false, - topVer, - GridDrType.DR_LOAD); + entry.initialValue(e.getValue(), + ver, + ttl, + expiryTime, + false, + topVer, + GridDrType.DR_LOAD); - cctx.evicts().touch(entry, topVer); - } - catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { - // No-op. - } - catch (IgniteCheckedException ex) { - IgniteLogger log = cache.unwrap(Ignite.class).log(); + cctx.evicts().touch(entry, topVer); + } + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + IgniteLogger log = cache.unwrap(Ignite.class).log(); - U.error(log, "Failed to set initial value for cache entry: " + e, ex); - } + U.error(log, "Failed to set initial value for cache entry: " + e, ex); } } - finally { - proxy.gate().leave(); - } } } }