Repository: incubator-ignite Updated Branches: refs/heads/ignite-611 1b025202b -> 30a6b127f
#ignite-611: revert. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30a6b127 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30a6b127 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30a6b127 Branch: refs/heads/ignite-611 Commit: 30a6b127fe7e05c1b6666543a9697d142f413a04 Parents: 1b02520 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Mar 26 19:53:18 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Mar 26 19:53:18 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 52 +++++++++++++++++ .../processors/cache/GridCacheProcessor.java | 5 +- .../cache/GridCacheProjectionImpl.java | 59 ++++++++++++++++++++ .../CacheDataStructuresManager.java | 4 +- .../GridProjectionForCachesSelfTest.java | 14 ++--- ...achePartitionedMultiNodeFullApiSelfTest.java | 30 ++++++++-- .../hadoop/jobtracker/HadoopJobTracker.java | 7 ++- 7 files changed, 153 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8c97888..8d3024b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -465,6 +465,58 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "RedundantCast"}) + public <K1, V1> CacheProjection<K1, V1> projection( + Class<? super K1> keyType, + Class<? super V1> valType + ) { + if (ctx.deploymentEnabled()) { + try { + ctx.deploy().registerClasses(keyType, valType); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((CacheProjection<K1, V1>)this, + (GridCacheContext<K1, V1>)ctx, + CU.typeFilter0(keyType, valType), + /*flags*/null, + /*clientId*/null, + false, + null); + + return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); + } + + /** {@inheritDoc} */ + public CacheProjection<K, V> projection(CacheEntryPredicate filter) { + if (filter == null) + return this; + + if (ctx.deploymentEnabled()) { + try { + ctx.deploy().registerClasses(filter); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>( + this, + ctx, + filter, + null, + null, + false, + null); + + return new GridCacheProxyImpl<>(ctx, prj, prj); + } + + /** {@inheritDoc} */ @Override public CacheConfiguration configuration() { return ctx.config(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index caeead6..8dfa08a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2189,8 +2189,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Projection over utility cache. */ public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls, Class<V> valCls) { - GridCache<K, V> cache = cache(CU.UTILITY_CACHE_NAME); - return (GridCacheProjectionEx<K, V>)cache; + GridCacheAdapter<K, V> cache = internalCache(CU.UTILITY_CACHE_NAME); + + return (GridCacheProjectionEx<K, V>)cache.projection(keyCls, valCls); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 4a7e3ca..6a1f8fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -279,6 +279,65 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @SuppressWarnings( {"unchecked", "RedundantCast"}) + public <K1, V1> CacheProjection<K1, V1> projection( + Class<? super K1> keyType, + Class<? super V1> valType + ) { + A.notNull(keyType, "keyType", valType, "valType"); + + if (cctx.deploymentEnabled()) { + try { + cctx.deploy().registerClasses(keyType, valType); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>( + (CacheProjection<K1, V1>)this, + (GridCacheContext<K1, V1>)cctx, + CU.typeFilter0(keyType, valType), + flags, + subjId, + keepPortable, + expiryPlc); + + return new GridCacheProxyImpl((GridCacheContext<K1, V1>)cctx, prj, prj); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + public CacheProjection<K, V> projection(CacheEntryPredicate filter) { + if (filter == null) + return new GridCacheProxyImpl<>(cctx, this, this); + + if (this.filter != null) + filter = and(filter); + + if (cctx.deploymentEnabled()) { + try { + cctx.deploy().registerClasses(filter); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + filter, + flags, + subjId, + keepPortable, + expiryPlc); + + return new GridCacheProxyImpl<>(cctx, prj, prj); + } + + + /** {@inheritDoc} */ @Override public CacheProjection<K, V> flagsOn(@Nullable CacheFlag[] flags) { if (F.isEmpty(flags)) return new GridCacheProxyImpl<>(cctx, this, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index ac48b03..2de56b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -56,7 +56,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; /** Queue header view. */ - private GridCache<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; + private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; /** Query notifying about queue update. */ private UUID queueQryId; @@ -85,7 +85,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { try { - queueHdrView = cctx.grid().cachex(cctx.name()); + queueHdrView = cctx.cache().projection(GridCacheQueueHeaderKey.class, GridCacheQueueHeader.class); initFlag = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java index a12435e..86295ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java @@ -121,13 +121,13 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { public void testProjectionForDefaultCache() throws Exception { ClusterGroup prj = ignite.cluster().forCacheNodes(null); - assert prj != null; - assert prj.nodes().size() == 3; - assert prj.nodes().contains(grid(0).localNode()); - assert !prj.nodes().contains(grid(1).localNode()); - assert prj.nodes().contains(grid(2).localNode()); - assert prj.nodes().contains(grid(3).localNode()); - assert !prj.nodes().contains(grid(4).localNode()); + assertNotNull(prj); + assertEquals(3, prj.nodes().size()); + assertTrue(prj.nodes().contains(grid(0).localNode())); + assertFalse(prj.nodes().contains(grid(1).localNode())); + assertTrue(prj.nodes().contains(grid(2).localNode())); + assertTrue(prj.nodes().contains(grid(3).localNode())); + assertTrue(prj.nodes().contains(grid(4).localNode())); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java index 0c1977e..5b78469 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java @@ -298,7 +298,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti info("Generating keys for test..."); - GridCache<String, Integer> cache0 = cache(0); + GridCacheAdapter<String, Integer> cache0 = ((IgniteKernal)grid(0)).internalCache(); for (int i = 0; i < 5; i++) { while (true) { @@ -308,7 +308,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti ignite(0).affinity(null).isBackup(grid(1).localNode(), key)) { keys.add(key); - assertTrue(cache0.putx(key, i)); + cache0.put(key, i); break; } @@ -317,7 +317,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti info("Finished generating keys for test."); - GridCache<String, Integer> cache2 = cache(2); + GridCacheAdapter<String, Integer> cache2 = ((IgniteKernal)grid(2)).internalCache(); assertEquals(Integer.valueOf(0), cache2.get(keys.get(0))); assertEquals(Integer.valueOf(1), cache2.get(keys.get(1))); @@ -325,13 +325,35 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti assertEquals(0, cache0.nearSize()); assertEquals(5, cache0.size() - cache0.nearSize()); - GridCache<String, Integer> cache1 = cache(1); + GridCacheAdapter<String, Integer> cache1 = ((IgniteKernal)grid(1)).internalCache(); assertEquals(0, cache1.nearSize()); assertEquals(5, cache1.size() - cache1.nearSize()); assertEquals(nearEnabled() ? 2 : 0, cache2.nearSize()); assertEquals(0, cache2.size() - cache2.nearSize()); + + CacheEntryPredicate prjFilter = new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + try { + Integer val = CU.value(e.rawGetOrUnmarshal(false), e.context(), false); + + return val != null && val >= 1 && val <= 3; + } + catch (IgniteCheckedException err) { + throw new IgniteException(err); + } + } + }; + + assertEquals(0, cache0.projection(prjFilter).nearSize()); + assertEquals(3, cache0.projection(prjFilter).size() - cache0.projection(prjFilter).nearSize()); + + assertEquals(0, cache1.projection(prjFilter).nearSize()); + assertEquals(3, cache1.projection(prjFilter).size() - cache1.projection(prjFilter).nearSize()); + + assertEquals(nearEnabled() ? 1 : 0, cache2.projection(prjFilter).nearSize()); + assertEquals(0, cache2.projection(prjFilter).size() - cache2.projection(prjFilter).nearSize()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30a6b127/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 973b731..a420f23 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -57,7 +57,7 @@ public class HadoopJobTracker extends HadoopComponent { private final GridMutex mux = new GridMutex(); /** */ - private volatile GridCacheAdapter<HadoopJobId, HadoopJobMetadata> jobMetaPrj; + private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaPrj; /** Projection with expiry policy for finished job updates. */ private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; @@ -108,7 +108,7 @@ public class HadoopJobTracker extends HadoopComponent { */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() { - GridCacheAdapter<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; + GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; if (prj == null) { synchronized (mux) { @@ -129,7 +129,8 @@ public class HadoopJobTracker extends HadoopComponent { throw new IllegalStateException(e); } - jobMetaPrj = prj = sysCache; + jobMetaPrj = prj = (GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata>) + sysCache.projection(HadoopJobId.class, HadoopJobMetadata.class); if (ctx.configuration().getFinishedJobInfoTtl() > 0) { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(