# ignite-981 fixed wait for cache initialization on clients
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f Branch: refs/heads/ignite-389-ipc Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f Parents: 1603fe5 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jun 4 09:25:42 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jun 4 11:50:36 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 117 +++++++------ .../dht/preloader/GridDhtPreloader.java | 2 +- .../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 1 + 4 files changed, 232 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 6e8d457..4382731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - Object msgBody = ioMsg.body(); - - assert msgBody != null || ioMsg.bodyBytes() != null; + busyLock.readLock(); try { - byte[] msgTopicBytes = ioMsg.topicBytes(); - - Object msgTopic = ioMsg.topic(); - - GridDeployment dep = ioMsg.deployment(); - - if (dep == null && ctx.config().isPeerClassLoadingEnabled() && - ioMsg.deploymentClassName() != null) { - dep = ctx.deploy().getGlobalDeployment( - ioMsg.deploymentMode(), - ioMsg.deploymentClassName(), - ioMsg.deploymentClassName(), - ioMsg.userVersion(), - nodeId, - ioMsg.classLoaderId(), - ioMsg.loaderParticipants(), - null); - - if (dep == null) - throw new IgniteDeploymentCheckedException( - "Failed to obtain deployment information for user message. " + - "If you are using custom message or topic class, try implementing " + - "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); - - ioMsg.deployment(dep); // Cache deployment. + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received user message while stopping (will ignore) [nodeId=" + + nodeId + ", msg=" + msg + ']'); + + return; } - // Unmarshall message topic if needed. - if (msgTopic == null && msgTopicBytes != null) { - msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); + Object msgBody = ioMsg.body(); - ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. - } + assert msgBody != null || ioMsg.bodyBytes() != null; - if (!F.eq(topic, msgTopic)) - return; + try { + byte[] msgTopicBytes = ioMsg.topicBytes(); + + Object msgTopic = ioMsg.topic(); + + GridDeployment dep = ioMsg.deployment(); + + if (dep == null && ctx.config().isPeerClassLoadingEnabled() && + ioMsg.deploymentClassName() != null) { + dep = ctx.deploy().getGlobalDeployment( + ioMsg.deploymentMode(), + ioMsg.deploymentClassName(), + ioMsg.deploymentClassName(), + ioMsg.userVersion(), + nodeId, + ioMsg.classLoaderId(), + ioMsg.loaderParticipants(), + null); + + if (dep == null) + throw new IgniteDeploymentCheckedException( + "Failed to obtain deployment information for user message. " + + "If you are using custom message or topic class, try implementing " + + "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); + + ioMsg.deployment(dep); // Cache deployment. + } - if (msgBody == null) { - msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); + // Unmarshall message topic if needed. + if (msgTopic == null && msgTopicBytes != null) { + msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); - ioMsg.body(msgBody); // Save body to avoid future unmarshallings. - } + ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. + } - // Resource injection. - if (dep != null) - ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + - msg + ']', e); - } + if (!F.eq(topic, msgTopic)) + return; + + if (msgBody == null) { + msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); + + ioMsg.body(msgBody); // Save body to avoid future unmarshallings. + } - if (msgBody != null) { - if (predLsnr != null) { - if (!predLsnr.apply(nodeId, msgBody)) - removeMessageListener(TOPIC_COMM_USER, this); + // Resource injection. + if (dep != null) + ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + + msg + ']', e); + } + + if (msgBody != null) { + if (predLsnr != null) { + if (!predLsnr.apply(nodeId, msgBody)) + removeMessageListener(TOPIC_COMM_USER, this); + } + } + } + finally { + busyLock.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 1aef18c..51010ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java new file mode 100644 index 0000000..855a4f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.messaging; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Message topic. */ + private enum TOPIC { + /** */ + ORDERED + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (gridName.equals(getTestGridName(2))) { + cfg.setClientMode(true); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + } + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testMessageSendWithClientJoin() throws Exception { + startGrid(0); + + Ignite ignite1 = startGrid(1); + + ClusterGroup rmts = ignite1.cluster().forRemotes(); + + IgniteMessaging msg = ignite1.message(rmts); + + msg.localListen(TOPIC.ORDERED, new LocalListener()); + + msg.remoteListen(TOPIC.ORDERED, new RemoteListener()); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + + while (!stop.get()) { + if (iter % 10 == 0) + log.info("Client start/stop iteration: " + iter); + + iter++; + + try (Ignite ignite = startGrid(2)) { + assertTrue(ignite.configuration().isClientMode()); + } + } + + return null; + } + }, 1, "client-start-stop"); + + try { + long stopTime = U.currentTimeMillis() + 30_000; + + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + try { + ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0); + } + catch (IgniteException e) { + log.info("Message send failed: " + e); + } + + iter++; + + if (iter % 100 == 0) + Thread.sleep(5); + } + } + finally { + stop.set(true); + } + + fut.get(); + } + + /** + * + */ + private static class LocalListener implements IgniteBiPredicate<UUID, String> { + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, String s) { + return true; + } + } + + /** + * + */ + private static class RemoteListener implements IgniteBiPredicate<UUID, String> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID nodeId, String msg) { + ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg); + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 9eb31f1..e0a1e6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(new TestSuite(GridSelfTest.class)); suite.addTest(new TestSuite(GridProjectionSelfTest.class)); suite.addTest(new TestSuite(GridMessagingSelfTest.class)); + suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class)); if (U.isLinux() || U.isMacOs())