# ignite-943 fix services on client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/99234d0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/99234d0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/99234d0a Branch: refs/heads/ignite-389 Commit: 99234d0ac5fd2e1d1033b1930cb4d7d6d49e0f28 Parents: 477a215 Author: sboikov <sboi...@gridgain.com> Authored: Fri May 29 16:39:00 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 29 16:39:00 2015 +0300 ---------------------------------------------------------------------- .../service/GridServiceProcessor.java | 104 ++++++++++++++++++- .../service/ClosureServiceClientsNodesTest.java | 10 ++ 2 files changed, 110 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99234d0a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index a9a15e9..aedac75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -59,6 +60,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Time to wait before reassignment retries. */ private static final long RETRY_TIMEOUT = 1000; + /** */ + private static final ServiceDeploymentPredicate DEPLOYMENT_PREDICATE = new ServiceDeploymentPredicate(); + + /** */ + private static final ServiceAssignmentsPredicate ASSIGNMENTS_PREDICATE = new ServiceAssignmentsPredicate(); + /** Local service instances. */ private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>(); @@ -345,7 +352,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']')); } else { - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ASSIGNMENTS_PREDICATE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (e.getKey() instanceof GridServiceAssignmentsKey) { GridServiceAssignments assigns = (GridServiceAssignments)e.getValue(); @@ -437,7 +448,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { public IgniteInternalFuture<?> cancelAll() { Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(); - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(DEPLOYMENT_PREDICATE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -456,7 +471,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { public Collection<ServiceDescriptor> serviceDescriptors() { Collection<ServiceDescriptor> descs = new ArrayList<>(); - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(DEPLOYMENT_PREDICATE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -904,6 +923,43 @@ public class GridServiceProcessor extends GridProcessorAdapter { } /** + * @param p Entry predicate used to execute query from client node. + * @return Service deployment entries. + */ + @SuppressWarnings("unchecked") + private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) { + if (!cache.context().affinityNode()) { + ClusterNode oldestSrvNode = + CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + + if (oldestSrvNode == null) + return F.emptyIterator(); + + GridCacheQueryManager qryMgr = cache.context().queries(); + + CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false); + + qry.keepAll(false); + + qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); + + return cache.context().itHolder().iterator(qry.execute(), + new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() { + @Override protected Object convert(Map.Entry<Object, Object> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Object item) { + throw new UnsupportedOperationException(); + } + } + ); + } + else + return cache.entrySetx().iterator(); + } + + /** * Service deployment listener. */ private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> { @@ -1056,7 +1112,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().context().deploy().ignoreOwnership(true); try { - for (Cache.Entry<Object, Object> e : cache.entrySetx()) { + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(DEPLOYMENT_PREDICATE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); + if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -1265,4 +1325,40 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ public abstract void run0(); } + + /** + * + */ + static class ServiceDeploymentPredicate implements IgniteBiPredicate<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key instanceof GridServiceDeploymentKey; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceDeploymentPredicate.class, this); + } + } + + /** + * + */ + static class ServiceAssignmentsPredicate implements IgniteBiPredicate<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key instanceof GridServiceAssignmentsKey; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceAssignmentsPredicate.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99234d0a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java index 59594be..faccc9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java @@ -84,6 +84,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { srvNames.add(getTestGridName(i)); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute().broadcast(new IgniteCallable<String>() { @@ -109,6 +111,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { */ public void testClientClosure() throws Exception { for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute(ignite.cluster().forClients()). @@ -134,6 +138,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { */ public void testCustomClosure() throws Exception { for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); Collection<String> res = ignite.compute(ignite.cluster().forPredicate(F.<ClusterNode>alwaysTrue())). @@ -157,6 +163,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { UUID clientNodeId = grid(0).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService()); @@ -190,6 +198,8 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { UUID clientNodeId = grid(0).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { + log.info("Iteration: " + i); + Ignite ignite = grid(i); ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME, new TestService());