# Test for ignite-1245.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aed83af5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aed83af5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aed83af5 Branch: refs/heads/ignite-426 Commit: aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac Parents: 36f7ba6 Author: sboikov <sboi...@gridgain.com> Authored: Thu Aug 13 13:05:43 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Aug 13 13:05:43 2015 +0300 ---------------------------------------------------------------------- .../IgniteCacheContinuousQueryClientTest.java | 114 +++++++++++++++++++ 1 file changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aed83af5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java new file mode 100644 index 0000000..bb413a0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.event.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoins() throws Exception { + startGrids(2); + + client = true; + + Ignite clientNode = startGrid(3); + + client = false; + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clientNode.cache(null).query(qry); + + Ignite joined = startGrid(4); + + IgniteCache<Object, Object> joinedCache = joined.cache(null); + + joinedCache.put(primaryKey(joinedCache), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +}