IGNITE-158 implement ClusterGroup#forClientNodes() and ClusterGroup#forClientNodes().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b9629d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b9629d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b9629d0 Branch: refs/heads/ignite-6 Commit: 1b9629d07ef7f2a7dd7a43ae41296e79a9a9612b Parents: f04e0ec Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon Feb 2 13:51:34 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon Feb 2 13:51:34 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cluster/ClusterGroup.java | 23 ++++++++ .../ignite/internal/ClusterGroupAdapter.java | 55 ++++++++++++++++++-- .../ignite/internal/IgniteClusterAsyncImpl.java | 10 ++++ .../GridProjectionForCachesSelfTest.java | 44 ++++++++++++++-- ...idHadoopDefaultMapReducePlannerSelfTest.java | 10 ++++ 5 files changed, 134 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java index 9f2f435..6267dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterGroup.java @@ -18,6 +18,7 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -148,6 +149,28 @@ public interface ClusterGroup { public ClusterGroup forCacheNodes(String cacheName, @Nullable String... cacheNames); /** + * Creates projection for all nodes that have cache with specified name running and cache distribution mode is + * {@link CacheDistributionMode#PARTITIONED_ONLY} or {@link CacheDistributionMode#NEAR_PARTITIONED}. + * + * @param cacheName Cache name. + * @param cacheNames Optional additional cache names to include into projection. + * @return Projection over nodes that have specified cache running. + * @see CacheConfiguration#getDistributionMode() + */ + public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames); + + /** + * Creates projection for all nodes that have cache with specified name running and cache distribution mode is + * {@link CacheDistributionMode#CLIENT_ONLY} or {@link CacheDistributionMode#NEAR_ONLY}. + * + * @param cacheName Cache name. + * @param cacheNames Optional additional cache names to include into projection. + * @return Projection over nodes that have specified cache running. + * @see CacheConfiguration#getDistributionMode() + */ + public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames); + + /** * Creates projection for all nodes that have streamer with specified name running. * * @param streamerName Streamer name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java index 7dff797..fafb23c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.executor.*; import org.apache.ignite.internal.util.typedef.*; @@ -525,7 +527,17 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** {@inheritDoc} */ @Override public final ClusterGroup forCacheNodes(@Nullable String cacheName, @Nullable String... cacheNames) { - return forPredicate(new CachesFilter(cacheName, cacheNames)); + return forPredicate(new CachesFilter(cacheName, cacheNames, null)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forDataNodes(@Nullable String cacheName, @Nullable String... cacheNames) { + return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.DATA_MODES)); + } + + /** {@inheritDoc} */ + @Override public final ClusterGroup forClientNodes(@Nullable String cacheName, @Nullable String... cacheNames) { + return forPredicate(new CachesFilter(cacheName, cacheNames, CachesFilter.CLIENT_MODES)); } /** {@inheritDoc} */ @@ -652,6 +664,14 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { */ private static class CachesFilter implements IgnitePredicate<ClusterNode> { /** */ + private static final Set<CacheDistributionMode> DATA_MODES = EnumSet.of(CacheDistributionMode.NEAR_PARTITIONED, + CacheDistributionMode.PARTITIONED_ONLY); + + /** */ + private static final Set<CacheDistributionMode> CLIENT_MODES = EnumSet.of(CacheDistributionMode.CLIENT_ONLY, + CacheDistributionMode.NEAR_ONLY); + + /** */ private static final long serialVersionUID = 0L; /** Cache name. */ @@ -660,27 +680,54 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** Cache names. */ private final String[] cacheNames; + /** */ + private final Set<CacheDistributionMode> distributionMode; + /** * @param cacheName Cache name. * @param cacheNames Cache names. + * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}. */ - private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames) { + private CachesFilter(@Nullable String cacheName, @Nullable String[] cacheNames, + @Nullable Set<CacheDistributionMode> distributionMode) { this.cacheName = cacheName; this.cacheNames = cacheNames; + this.distributionMode = distributionMode; } /** {@inheritDoc} */ @Override public boolean apply(ClusterNode n) { - if (!U.hasCache(n, cacheName)) + GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); + + if (caches == null) + return false; + + if (!hasCache(caches, cacheName, distributionMode)) return false; if (!F.isEmpty(cacheNames)) for (String cn : cacheNames) - if (!U.hasCache(n, cn)) + if (!hasCache(caches, cn, distributionMode)) return false; return true; } + + /** + * @param cacheName Cache name to check. + * @param distributionMode Filter by {@link CacheConfiguration#getDistributionMode()}. + * @return {@code true} if given node has specified cache started. + */ + public static boolean hasCache(GridCacheAttributes[] caches, @Nullable String cacheName, + @Nullable Collection<CacheDistributionMode> distributionMode) { + for (GridCacheAttributes attrs : caches) { + if (Objects.equals(cacheName, attrs.cacheName()) + && (distributionMode == null || distributionMode.contains(attrs.partitionedTaxonomy()))) + return true; + } + + return false; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java index ea148ff..7e2eaf1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClusterAsyncImpl.java @@ -172,6 +172,16 @@ public class IgniteClusterAsyncImpl extends IgniteAsyncSupportAdapter<IgniteClus } /** {@inheritDoc} */ + @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) { + return grid.forDataNodes(cacheName, cacheNames); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) { + return grid.forClientNodes(cacheName, cacheNames); + } + + /** {@inheritDoc} */ @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) { return grid.forStreamer(streamerName, streamerNames); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java index 35c8afb..8bcceb1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java @@ -53,11 +53,12 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoverySpi()); if (gridName.equals(getTestGridName(0))) - cfg.setCacheConfiguration(cacheConfiguration(null)); + cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.PARTITIONED_ONLY)); else if (gridName.equals(getTestGridName(1))) - cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME)); + cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_ONLY)); else if (gridName.equals(getTestGridName(2)) || gridName.equals(getTestGridName(3))) - cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(CACHE_NAME)); + cfg.setCacheConfiguration(cacheConfiguration(null, CacheDistributionMode.CLIENT_ONLY), + cacheConfiguration(CACHE_NAME, CacheDistributionMode.NEAR_PARTITIONED)); else cfg.setCacheConfiguration(); @@ -79,11 +80,14 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { * @param cacheName Cache name. * @return Cache configuration. */ - private CacheConfiguration cacheConfiguration(@Nullable String cacheName) { + private CacheConfiguration cacheConfiguration(@Nullable String cacheName, CacheDistributionMode distributionMode) { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(cacheName); cfg.setCacheMode(PARTITIONED); + + cfg.setDistributionMode(distributionMode); + cfg.setBackups(1); return cfg; @@ -153,6 +157,38 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testProjectionForDataCaches() throws Exception { + ClusterGroup prj = ignite.cluster().forDataNodes(null); + + assert prj != null; + assert prj.nodes().size() == 1; + assert prj.nodes().contains(grid(0).localNode()); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForClientCaches() throws Exception { + ClusterGroup prj = ignite.cluster().forClientNodes(CACHE_NAME); + + assert prj != null; + assert prj.nodes().size() == 1; + assert prj.nodes().contains(grid(1).localNode()); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionForClientBothCaches() throws Exception { + ClusterGroup prj = ignite.cluster().forClientNodes(null, CACHE_NAME); + + assert prj != null; + assert prj.nodes().isEmpty(); + } + + /** + * @throws Exception If failed. + */ public void testProjectionForWrongCacheName() throws Exception { ClusterGroup prj = ignite.cluster().forCacheNodes("wrong"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b9629d0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java index 014f7c5..ca927ec 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java @@ -1145,6 +1145,16 @@ public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstrac } /** {@inheritDoc} */ + @Override public ClusterGroup forDataNodes(String cacheName, @Nullable String... cacheNames) { + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forClientNodes(String cacheName, @Nullable String... cacheNames) { + return null; + } + + /** {@inheritDoc} */ @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) { return null; }