#ignite-743: add StreamReceiver uses IgniteCacheProxyLockFree.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dbcb60a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dbcb60a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dbcb60a1

Branch: refs/heads/ignite-758
Commit: dbcb60a134d6f919d8b0e95c96a7730705fed75f
Parents: b5d6303
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Wed Apr 15 11:47:21 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Wed Apr 15 11:47:21 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerCacheUpdaters.java |  2 +-
 .../datastreamer/DataStreamerImpl.java          | 83 +++++++++-----------
 2 files changed, 39 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dbcb60a1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
index ed51730..fc6783d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
@@ -85,7 +85,7 @@ public class DataStreamerCacheUpdaters {
 
         // Here we assume that there are no key duplicates, so the following 
calls are valid.
         if (rmvCol != null)
-            ((IgniteCacheProxyLockFree<K, V>)cache).removeAll(rmvCol);
+            cache.removeAll(rmvCol);
 
         if (putMap != null)
             cache.putAll(putMap);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dbcb60a1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a03a925..83fb3cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1373,69 +1373,62 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         /** {@inheritDoc} */
         @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> 
cache,
             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
-            IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
+            IgniteCacheProxyLockFree<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxyLockFree<KeyCacheObject, CacheObject>)cache;
 
-            proxy.gate().enter();
+            GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = 
proxy.context().cache();
 
-            try {
-                GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = 
proxy.context().cache();
-
-                if (internalCache.isNear())
-                    internalCache = internalCache.context().near().dht();
+            if (internalCache.isNear())
+                internalCache = internalCache.context().near().dht();
 
-                GridCacheContext cctx = internalCache.context();
+            GridCacheContext cctx = internalCache.context();
 
-                AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
 
-                GridCacheVersion ver = cctx.versions().next(topVer);
+            GridCacheVersion ver = cctx.versions().next(topVer);
 
-                long ttl = CU.TTL_ETERNAL;
-                long expiryTime = CU.EXPIRE_TIME_ETERNAL;
+            long ttl = CU.TTL_ETERNAL;
+            long expiryTime = CU.EXPIRE_TIME_ETERNAL;
 
-                ExpiryPolicy plc = cctx.expiry();
+            ExpiryPolicy plc = cctx.expiry();
 
-                for (Entry<KeyCacheObject, CacheObject> e : entries) {
-                    try {
-                        e.getKey().finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
+            for (Entry<KeyCacheObject, CacheObject> e : entries) {
+                try {
+                    e.getKey().finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
 
-                        GridCacheEntryEx entry = 
internalCache.entryEx(e.getKey(), topVer);
+                    GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), 
topVer);
 
-                        entry.unswap(false);
+                    entry.unswap(false);
 
-                        if (plc != null) {
-                            ttl = CU.toTtl(plc.getExpiryForCreation());
+                    if (plc != null) {
+                        ttl = CU.toTtl(plc.getExpiryForCreation());
 
-                            if (ttl == CU.TTL_ZERO)
-                                continue;
-                            else if (ttl == CU.TTL_NOT_CHANGED)
-                                ttl = 0;
+                        if (ttl == CU.TTL_ZERO)
+                            continue;
+                        else if (ttl == CU.TTL_NOT_CHANGED)
+                            ttl = 0;
 
-                            expiryTime = CU.toExpireTime(ttl);
-                        }
+                        expiryTime = CU.toExpireTime(ttl);
+                    }
 
-                        entry.initialValue(e.getValue(),
-                            ver,
-                            ttl,
-                            expiryTime,
-                            false,
-                            topVer,
-                            GridDrType.DR_LOAD);
+                    entry.initialValue(e.getValue(),
+                        ver,
+                        ttl,
+                        expiryTime,
+                        false,
+                        topVer,
+                        GridDrType.DR_LOAD);
 
-                        cctx.evicts().touch(entry, topVer);
-                    }
-                    catch (GridDhtInvalidPartitionException | 
GridCacheEntryRemovedException ignored) {
-                        // No-op.
-                    }
-                    catch (IgniteCheckedException ex) {
-                        IgniteLogger log = cache.unwrap(Ignite.class).log();
+                    cctx.evicts().touch(entry, topVer);
+                }
+                catch (GridDhtInvalidPartitionException | 
GridCacheEntryRemovedException ignored) {
+                    // No-op.
+                }
+                catch (IgniteCheckedException ex) {
+                    IgniteLogger log = cache.unwrap(Ignite.class).log();
 
-                        U.error(log, "Failed to set initial value for cache 
entry: " + e, ex);
-                    }
+                    U.error(log, "Failed to set initial value for cache entry: 
" + e, ex);
                 }
             }
-            finally {
-                proxy.gate().leave();
-            }
         }
     }
 }

Reply via email to