http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/config/router/default-router.xml ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/config/grid-client-spring-config.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java index be3ba1d,0000000..da9d033 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java @@@ -1,616 -1,0 +1,615 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; - import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; +import org.junit.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.testframework.GridTestUtils.*; + +/** + * + */ +public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Partitioned cache name. */ + protected static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Partitioned cache with async commit and backup name. */ + protected static final String PARTITIONED_ASYNC_BACKUP_CACHE_NAME = "partitioned-async-backup"; + + /** Replicated cache name. */ + private static final String REPLICATED_CACHE_NAME = "replicated"; + + /** Replicated cache with async commit name. */ + private static final String REPLICATED_ASYNC_CACHE_NAME = "replicated-async"; + + /** Nodes count. */ + protected static final int NODES_CNT = 5; + + /** Thread count to run tests. */ + private static final int THREAD_CNT = 20; + + /** Count of tasks to run. */ + private static final int TASK_EXECUTION_CNT = 50000; + + /** Count of cache puts in tests. */ + private static final int CACHE_PUT_CNT = 10000; + + /** Topology update frequency. */ + private static final int TOP_REFRESH_FREQ = 1000; + + /** Info messages will be printed each 5000 iterations. */ + private static final int STATISTICS_PRINT_STEP = 5000; + + /** Host. */ + public static final String HOST = "127.0.0.1"; + + /** Base for tcp rest ports. */ + public static final int REST_TCP_PORT_BASE = 12345; + + static { - System.setProperty("CLIENTS_MODULE_PATH", U.resolveGridGainPath("modules/clients").getAbsolutePath()); ++ System.setProperty("CLIENTS_MODULE_PATH", U.resolveIgnitePath("modules/clients").getAbsolutePath()); + } + + /** Client instance for each test. */ + private GridClient client; + + /** + * @return Client protocol that should be used. + */ + protected abstract GridClientProtocol protocol(); + + /** + * @return Server address to create first connection. + */ + protected abstract String serverAddress(); + + /** + * @return Whether SSL should be used. + */ + protected abstract boolean useSsl(); + + /** + * @return SSL context factory to use if SSL is enabled. + */ + protected abstract GridSslContextFactory sslContextFactory(); + + /** + * @return Count of iterations for sync commit test. + */ + protected int syncCommitIterCount() { + return 1000; + } + + /** + * @return Topology refresh frequency interval. + */ + protected int topologyRefreshFrequency() { + return TOP_REFRESH_FREQ; + } + + /** + * @return Max connection idle time. + */ + protected int maxConnectionIdleTime() { + return 5000; + } + + /** + * @return Number of tasks that should be executed during test. + */ + protected int taskExecutionCount() { + return TASK_EXECUTION_CNT; + } + + /** + * @return Number of puts to the cache. + */ + protected int cachePutCount() { + return CACHE_PUT_CNT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setLocalHost(HOST); + + assert c.getConnectorConfiguration() == null; + + ConnectorConfiguration clientCfg = new ConnectorConfiguration(); + + clientCfg.setPort(REST_TCP_PORT_BASE); + + if (useSsl()) { + clientCfg.setSslEnabled(true); + + clientCfg.setSslContextFactory(sslContextFactory()); + } + + c.setConnectorConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + c.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(PARTITIONED_CACHE_NAME), + cacheConfiguration(REPLICATED_CACHE_NAME), cacheConfiguration(PARTITIONED_ASYNC_BACKUP_CACHE_NAME), + cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME)); + + return c; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + + if (cacheName == null) + cfg.setCacheMode(LOCAL); + else if (PARTITIONED_CACHE_NAME.equals(cacheName)) { + cfg.setCacheMode(PARTITIONED); + + cfg.setBackups(0); + } + else if (PARTITIONED_ASYNC_BACKUP_CACHE_NAME.equals(cacheName)) { + cfg.setCacheMode(PARTITIONED); + + cfg.setBackups(1); + } + else + cfg.setCacheMode(REPLICATED); + + cfg.setName(cacheName); + + if (cacheName != null && !cacheName.contains("async")) + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = GridClientFactory.start(clientConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridClientFactory.stop(client.id(), false); + + client = null; + } + + /** + * @throws Exception If failed. + */ + public void testSyncCommitFlagReplicated() throws Exception { + doTestSyncCommitFlag(client.data(REPLICATED_ASYNC_CACHE_NAME)); + } + + /** + * @throws Exception If failed. + */ + public void testSyncCommitFlagPartitioned() throws Exception { + doTestSyncCommitFlag(client.data(PARTITIONED_ASYNC_BACKUP_CACHE_NAME)); + } + + /** + * Extracts array from given iterator. + * + * @param nodes Iterator of nodes. + * @return Nodes array. + */ + private GridClientNode[] toArray(Iterator<? extends GridClientNode> nodes) { + ArrayList<GridClientNode> res = new ArrayList<>(); + + while (nodes.hasNext()) + res.add(nodes.next()); + + return res.toArray(new GridClientNode[res.size()]); + } + + /** + * Runs test on SYNC_COMMIT flag. + * + * @param data Client data to run test on. + * @throws Exception If failed. + */ + private void doTestSyncCommitFlag(final GridClientData data) throws Exception { + final String key = "k0"; + + Collection<UUID> affNodesIds = F.viewReadOnly( + grid(0).cache(data.cacheName()).affinity().mapKeyToPrimaryAndBackups(key), + F.node2id()); + + final GridClientData dataFirst = data.pinNodes(F.first(client.compute().nodes())); + + List<GridClientNode> affNodes = new ArrayList<>(); + + for (GridClientNode node : client.compute().nodes()) { + if (affNodesIds.contains(node.nodeId())) + affNodes.add(node); + } + + Assert.assertFalse(affNodes.isEmpty()); + + Iterator<? extends GridClientNode> it = affNodes.iterator(); + + final GridClientData dataOthers = data.pinNodes(it.next(), toArray(it)); + + for (int i = 0; i < syncCommitIterCount(); i++) { + final CountDownLatch l = new CountDownLatch(1); + + final String val = "v" + i; + + IgniteInternalFuture<?> f = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + l.await(); + + assertEquals(val, dataOthers.get(key)); + + return null; + } + }, THREAD_CNT); + + dataFirst.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put(key, val); + + l.countDown(); + + f.get(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedTaskRun() throws Exception { + final AtomicLong cnt = new AtomicLong(); + + final AtomicReference<GridClientException> err = new AtomicReference<>(); + + final ConcurrentLinkedQueue<String> execQueue = new ConcurrentLinkedQueue<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override + public void run() { + long processed; + + while ((processed = cnt.getAndIncrement()) < taskExecutionCount()) { + try { + if (processed > 0 && processed % STATISTICS_PRINT_STEP == 0) + info(">>>>>>> " + processed + " tasks finished."); + + String res = client.compute().execute(TestTask.class.getName(), null); + + execQueue.add(res); + } + catch (GridClientException e) { + err.compareAndSet(null, e); + } + } + } + }, THREAD_CNT, "client-task-request"); + + fut.get(); + + if (err.get() != null) + throw new Exception(err.get()); + + assertEquals(taskExecutionCount(), execQueue.size()); + + // With round-robin balancer each node must receive equal count of task requests. + Collection<String> executionIds = new HashSet<>(execQueue); + + assertTrue(executionIds.size() == NODES_CNT); + + Map<String, AtomicInteger> statisticsMap = new HashMap<>(); + + for (String id : executionIds) + statisticsMap.put(id, new AtomicInteger()); + + for (String id : execQueue) + statisticsMap.get(id).incrementAndGet(); + + info(">>>>>>> Execution statistics per node:"); + + for (Map.Entry<String, AtomicInteger> e : statisticsMap.entrySet()) + info(">>>>>>> " + e.getKey() + " run " + e.getValue().get() + " tasks"); + } + + /** + * @throws Exception If failed. + */ + public void test6Affinity() throws Exception { + GridClientData cache = client.data(PARTITIONED_CACHE_NAME); + UUID nodeId = cache.affinity("6"); + + info("Affinity node: " + nodeId); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedCachePut() throws Exception { + final AtomicLong keyCnt = new AtomicLong(); + + final AtomicReference<Exception> err = new AtomicReference<>(); + + final ConcurrentMap<String, T2<UUID, String>> puts = new ConcurrentHashMap<>(); + + final Map<UUID, Ignite> gridMap = new HashMap<>(); + + for (int i = 0; i < NODES_CNT; i++) { + Ignite g = grid(i); + + gridMap.put(g.cluster().localNode().id(), g); + } + + final Ignite ignite = F.first(gridMap.values()); + + assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings("OverlyStrongTypeCast") + @Override public void run() { + try { + GridClientData cache = client.data(PARTITIONED_CACHE_NAME); + + assertEquals(NODES_CNT, ((GridClientDataImpl)cache).projectionNodes().size()); + + long rawKey; + + while ((rawKey = keyCnt.getAndIncrement()) < cachePutCount()) { + String key = String.valueOf(rawKey); + + UUID nodeId = cache.affinity(key); + UUID srvNodeId = ignite.cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + + if (!nodeId.equals(srvNodeId)) { + //GridClientDataAffinity clAff = + // ((GridClientConfiguration)getFieldValue(client, "cfg")). + // getDataConfiguration(PARTITIONED_CACHE_NAME).getAffinity(); + + //printAffinityState(gridMap.values()); + //info("Client affinity: " + clAff); + + info("Got wrong client mapping [key=" + key + ", exp=" + srvNodeId + + ", actual=" + nodeId + "]"); + } + + String val = "val" + rawKey; + + if (cache.put(key, val)) { + T2<UUID, String> old = puts.putIfAbsent(key, new T2<>(nodeId, val)); + + assert old == null : "Map contained entry [key=" + rawKey + ", entry=" + old + ']'; + } + } + } + catch (Exception e) { + err.compareAndSet(null, e); + } + } + }, THREAD_CNT, "client-cache-put"); + + fut.get(); + + if (err.get() != null) + throw new Exception(err.get()); + + assertEquals(cachePutCount(), puts.size()); + + // Now check that all puts went to primary nodes. + for (long i = 0; i < cachePutCount(); i++) { + String key = String.valueOf(i); + + ClusterNode node = ignite.cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key); + + if (!puts.get(key).get2().equals(gridMap.get(node.id()).cache(PARTITIONED_CACHE_NAME).peek(key))) { + // printAffinityState(gridMap.values()); + + failNotEquals("Node don't have value for key [nodeId=" + node.id() + ", key=" + key + "]", + puts.get(key).get2(), gridMap.get(node.id()).cache(PARTITIONED_CACHE_NAME).peek(key)); + } + + // Assert that client has properly determined affinity node. + if (!node.id().equals(puts.get(key).get1())) { + //GridClientDataAffinity clAff = + // ((GridClientConfiguration)getFieldValue(client, "cfg")). + // getDataConfiguration(PARTITIONED_CACHE_NAME).getAffinity(); + + //printAffinityState(gridMap.values()); + //info("Client affinity: " + clAff); + + UUID curAffNode = client.data(PARTITIONED_CACHE_NAME).affinity(key); + + failNotEquals( + "Got different mappings [key=" + key + ", currId=" + curAffNode + "]", + node.id(), puts.get(key).get1()); + } + + // Check that no other nodes see this key. + for (UUID id : F.view(gridMap.keySet(), F.notEqualTo(node.id()))) + assertNull("Got value in near cache.", gridMap.get(id).cache(PARTITIONED_CACHE_NAME).peek(key)); + } + + for (Ignite g : gridMap.values()) - g.cache(PARTITIONED_CACHE_NAME).clearAll(); ++ g.cache(PARTITIONED_CACHE_NAME).clear(); + } + + /** + * @param grids Collection for Grids to print affinity info. + */ + private void printAffinityState(Iterable<Ignite> grids) { + for (Ignite g : grids) { + GridAffinityAssignmentCache affCache = getFieldValue( + ((IgniteKernal)g).internalCache(PARTITIONED_CACHE_NAME).context().affinity(), + "aff"); + + CacheAffinityFunction aff = getFieldValue(affCache, "aff"); + + info("Affinity [nodeId=" + g.cluster().localNode().id() + ", affinity=" + aff + "]"); + } + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * Creates client that will try to connect to only first node in grid. + * + * @return Client. + */ + private GridClientConfiguration clientConfiguration() { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setTopologyRefreshFrequency(topologyRefreshFrequency()); + cfg.setMaxConnectionIdleTime(maxConnectionIdleTime()); + + cfg.setProtocol(protocol()); + cfg.setServers(Arrays.asList(serverAddress())); + cfg.setBalancer(new GridClientRoundRobinBalancer()); + + if (useSsl()) + cfg.setSslContextFactory(sslContextFactory()); + + GridClientDataConfiguration loc = new GridClientDataConfiguration(); + + GridClientDataConfiguration partitioned = new GridClientDataConfiguration(); + partitioned.setName(PARTITIONED_CACHE_NAME); + partitioned.setAffinity(new GridClientPartitionAffinity()); + + GridClientDataConfiguration partitionedAsyncBackup = new GridClientDataConfiguration(); + partitionedAsyncBackup.setName(PARTITIONED_ASYNC_BACKUP_CACHE_NAME); + partitionedAsyncBackup.setAffinity(new GridClientPartitionAffinity()); + + GridClientDataConfiguration replicated = new GridClientDataConfiguration(); + replicated.setName(REPLICATED_CACHE_NAME); + + GridClientDataConfiguration replicatedAsync = new GridClientDataConfiguration(); + replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME); + + cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, replicatedAsync, partitionedAsyncBackup)); + + return cfg; + } + + /** + * Test task. Returns a tuple in which first component is id of node that has split the task, + * and second component is count of nodes that executed jobs. + */ + private static class TestTask extends ComputeTaskSplitAdapter<Object, String> { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Count of tasks this job was split to. */ + private int gridSize; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize); + + this.gridSize = gridSize; + + final String locNodeId = ignite.cluster().localNode().id().toString(); + + for (int i = 0; i < gridSize; i++) { + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + return new IgniteBiTuple<>(locNodeId, 1); + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public String reduce(List<ComputeJobResult> results) { + int sum = 0; + + String locNodeId = null; + + for (ComputeJobResult res : results) { + IgniteBiTuple<String, Integer> part = res.getData(); + + if (locNodeId == null) + locNodeId = part.get1(); + + Integer i = part.get2(); + + if (i != null) + sum += i; + } + + assert gridSize == sum; + + return locNodeId; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java index 3b35e07,0000000..59b4c53 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java @@@ -1,222 -1,0 +1,220 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.nio.charset.*; +import java.util.*; + +import static org.apache.ignite.internal.client.GridClientProtocol.*; +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * Tests that client is able to connect to a grid with only default cache enabled. + */ +public class ClientDefaultCacheSelfTest extends GridCommonAbstractTest { + /** Path to jetty config configured with SSL. */ + private static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Host. */ + private static final String HOST = "127.0.0.1"; + + /** Port. */ + private static final int TCP_PORT = 11211; + + /** Cached local node id. */ + private UUID locNodeId; + + /** Http port. */ + private static final int HTTP_PORT = 8081; + + /** Url address to send HTTP request. */ - private static final String TEST_URL = "http://" + HOST + ":" + HTTP_PORT + "/gridgain"; ++ private static final String TEST_URL = "http://" + HOST + ":" + HTTP_PORT + "/ignite"; + + /** Used to sent request charset. */ + private static final String CHARSET = StandardCharsets.UTF_8.name(); + + /** Name of node local cache. */ + private static final String LOCAL_CACHE = "local"; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IGNITE_JETTY_PORT, String.valueOf(HTTP_PORT)); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + + System.clearProperty(IGNITE_JETTY_PORT); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + locNodeId = grid().localNode().id(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assert cfg.getConnectorConfiguration() == null; + + ConnectorConfiguration clientCfg = new ConnectorConfiguration(); + + clientCfg.setJettyPath(REST_JETTY_CFG); + + cfg.setConnectorConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cLocal = new CacheConfiguration(); + + cLocal.setName(LOCAL_CACHE); + + cLocal.setCacheMode(CacheMode.LOCAL); + + cLocal.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + cfg.setCacheConfiguration(defaultCacheConfiguration(), cLocal); + + return cfg; + } + + /** + * @return Client. + * @throws GridClientException In case of error. + */ + private GridClient clientTcp() throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setProtocol(TCP); + cfg.setServers(getServerList(TCP_PORT)); + cfg.setDataConfigurations(Collections.singleton(new GridClientDataConfiguration())); + + GridClient gridClient = GridClientFactory.start(cfg); + + assert F.exist(gridClient.compute().nodes(), new IgnitePredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode n) { + return n.nodeId().equals(locNodeId); + } + }); + + return gridClient; + } + + /** + * Builds list of connection strings with few different ports. + * Used to avoid possible failures in case of port range active. + * + * @param startPort Port to start list from. + * @return List of client connection strings. + */ + private Collection<String> getServerList(int startPort) { + Collection<String> srvs = new ArrayList<>(); + + for (int i = startPort; i < startPort + 10; i++) + srvs.add(HOST + ":" + i); + + return srvs; + } + + /* + * Send HTTP request to Jetty server of node and process result. + * + * @param query Send query parameters. + * @return Processed response string. + */ + private String sendHttp(String query) { + String res = "No result"; + + try { + URLConnection connection = new URL(TEST_URL + "?" + query).openConnection(); + + connection.setRequestProperty("Accept-Charset", CHARSET); + + BufferedReader r = new BufferedReader(new InputStreamReader(connection.getInputStream())); + + res = r.readLine(); + + r.close(); + } + catch (IOException e) { + error("Failed to send HTTP request: " + TEST_URL + "?" + query, e); + } + + // Cut node id from response. + return res.substring(res.indexOf("\"response\"")); + } + + /** + * @throws Exception If failed. + */ + public void testTcp() throws Exception { + try { - boolean putRes = cache().putx("key", 1); - - assert putRes : "Put operation failed"; ++ jcache().put("key", 1); + + GridClient client = clientTcp(); + + Integer val = client.data().<String, Integer>get("key"); + + assert val != null; + + assert val == 1; + } + finally { + GridClientFactory.stopAll(); + } + } + + /** + * Json format string in cache should not transform to Json object on get request. + */ + public void testSkipString2JsonTransformation() { + // Put to cache JSON format string value. + assertEquals("Incorrect query response", "\"response\":true,\"sessionToken\":\"\",\"successStatus\":0}", + sendHttp("cmd=put&cacheName=" + LOCAL_CACHE + + "&key=a&val=%7B%22v%22%3A%22my%20Value%22%2C%22t%22%3A1422559650154%7D")); + + // Escape '\' symbols disappear from response string on transformation to JSON object. + assertEquals( + "Incorrect query response", + "\"response\":\"{\\\"v\\\":\\\"my Value\\\",\\\"t\\\":1422559650154}\"," + + "\"sessionToken\":\"\",\"successStatus\":0}", + sendHttp("cmd=get&cacheName=" + LOCAL_CACHE + "&key=a")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java index bdef7eb,0000000..9387311 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java @@@ -1,59 -1,0 +1,59 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Starts up one grid node (server) with pre-defined ports and tasks to test client-server interactions. + * <p> + * Note that different nodes cannot share the same port for rest services. If you want + * to start more than one node on the same physical machine you must provide different + * configurations for each node. Otherwise, this example would not work. + * <p> + * After this example has been started you can use pre-defined endpoints and task names in your + * client-server interactions to work with the node over secured protocols (binary over SSL or https). + * <p> + * Usually you cannot start secured and unsecured nodes in one grid, so started together + * secured and unsecured nodes belong to different grids. + * <p> + * Available endponts: + * <ul> + * <li>127.0.0.1:10443 - TCP SSL-protected endpoint.</li> + * <li>127.0.0.1:11443 - HTTP SSL-protected endpoint.</li> + * </ul> + * <p> + * Required credentials for remote client authentication: "s3cret". + */ +public class ClientSslNodeStartup { + /** + * Starts up two nodes with specified cache configuration on pre-defined endpoints. + * + * @param args Command line arguments, none required. + * @throws IgniteCheckedException In case of any exception. + */ + public static void main(String[] args) throws IgniteCheckedException { - System.setProperty("CLIENTS_MODULE_PATH", U.resolveGridGainPath("modules/clients").getAbsolutePath()); ++ System.setProperty("CLIENTS_MODULE_PATH", U.resolveIgnitePath("modules/clients").getAbsolutePath()); + + try (Ignite g = G.start("modules/clients/src/test/resources/spring-server-ssl-node.xml")) { + U.sleep(Long.MAX_VALUE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java index ecb27f0,0000000..006c168 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java @@@ -1,179 -1,0 +1,179 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.springframework.beans.factory.*; +import org.springframework.context.support.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Start node task, applicable arguments: + * <ul> + * <li>tcp</li> + * <li>http</li> + * <li>tcp+ssl</li> + * <li>http+ssl</li> + * </ul> + */ +public class ClientStartNodeTask extends TaskSingleJobSplitAdapter<String, Integer> { + /** + * Available node's configurations. + */ + private static final Map<String, String> NODE_CFG = new HashMap<String, String>() {{ + put("tcp", "modules/clients/src/test/resources/spring-server-node.xml"); + put("http", "modules/clients/src/test/resources/spring-server-node.xml"); + put("tcp+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + put("http+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + }}; + + /** */ - @IgniteLoggerResource ++ @LoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Object executeJob(int gridSize, String type) { + log.info(">>> Starting new grid node [currGridSize=" + gridSize + ", arg=" + type + "]"); + + if (type == null) + throw new IllegalArgumentException("Node type to start should be specified."); + + IgniteConfiguration cfg = getConfig(type); + + // Generate unique for this VM grid name. + String gridName = cfg.getGridName() + " (" + UUID.randomUUID() + ")"; + + // Update grid name (required to be unique). + cfg.setGridName(gridName); + + // Start new node in current VM. + Ignite g = G.start(cfg); + + log.info(">>> Grid started [nodeId=" + g.cluster().localNode().id() + ", name='" + g.name() + "']"); + + return true; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } + + /** + * Load grid configuration for specified node type. + * + * @param type Node type to load configuration for. + * @return Grid configuration for specified node type. + */ + static IgniteConfiguration getConfig(String type) { + String path = NODE_CFG.get(type); + + if (path == null) + throw new IllegalArgumentException("Unsupported node type: " + type); + - URL url = U.resolveGridGainUrl(path); ++ URL url = U.resolveIgniteUrl(path); + + BeanFactory ctx = new FileSystemXmlApplicationContext(url.toString()); + + return (IgniteConfiguration)ctx.getBean("grid.cfg"); + } + + /** + * Example for start/stop node tasks. + * + * @param args Not used. + */ + public static void main(String[] args) { + String nodeType = "tcp+ssl"; + + // Start initial node = 1 + try (Ignite g = G.start(NODE_CFG.get(nodeType))) { + // Change topology. + changeTopology(g, 4, 1, nodeType); + changeTopology(g, 1, 4, nodeType); + + // Stop node by id = 0 + g.compute().execute(ClientStopNodeTask.class, g.cluster().localNode().id().toString()); + + // Wait for node stops. + //U.sleep(1000); + + assert G.allGrids().isEmpty(); + } + catch (IgniteCheckedException e) { + System.err.println("Uncaught exception: " + e.getMessage()); + + e.printStackTrace(System.err); + } + } + + /** + * Change topology. + * + * @param parent Grid to execute tasks on. + * @param add New nodes count. + * @param rmv Remove nodes count. + * @param type Type of nodes to manipulate. + * @throws IgniteCheckedException On any exception. + */ + private static void changeTopology(Ignite parent, int add, int rmv, String type) throws IgniteCheckedException { + Collection<ComputeTaskFuture<?>> tasks = new ArrayList<>(); + + IgniteCompute comp = parent.compute().withAsync(); + + // Start nodes in parallel. + while (add-- > 0) { + comp.execute(ClientStartNodeTask.class, type); + + tasks.add(comp.future()); + } + + for (ComputeTaskFuture<?> task : tasks) + task.get(); + + // Stop nodes in sequence. + while (rmv-- > 0) + parent.compute().execute(ClientStopNodeTask.class, type); + + // Wait for node stops. + //U.sleep(1000); + + Collection<String> gridNames = new ArrayList<>(); + + for (Ignite g : G.allGrids()) + gridNames.add(g.name()); + + parent.log().info(">>> Available grids: " + gridNames); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java index 27375dc,0000000..9e3b42a mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java @@@ -1,127 -1,0 +1,127 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.resources.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Stop node task, applicable arguments: + * <ul> + * <li>node id (as string) to stop or</li> + * <li>node type (see start nodes task).</li> + * </ul> + */ +public class ClientStopNodeTask extends ComputeTaskSplitAdapter<String, Integer> { + /** */ - @IgniteLoggerResource ++ @LoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + Collection<ComputeJob> jobs = new ArrayList<>(); + + for (int i = 0; i < gridSize; i++) + jobs.add(new StopJob(arg)); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + ComputeJobResultPolicy superRes = super.result(res, rcvd); + + // Deny failover. + if (superRes == FAILOVER) + superRes = WAIT; + + return superRes; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + int stoppedCnt = 0; + + for (ComputeJobResult res : results) + if (!res.isCancelled()) + stoppedCnt+=(Integer)res.getData(); + + return stoppedCnt; + } + + /** + * Stop node job it is executed on. + */ + private static class StopJob extends ComputeJobAdapter { + /** */ + private final String gridType; + + /** */ - @IgniteLoggerResource ++ @LoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private StopJob(String gridType) { + this.gridType = gridType; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + log.info(">>> Stop node [nodeId=" + ignite.cluster().localNode().id() + ", name='" + ignite.name() + "']"); + + String prefix = ClientStartNodeTask.getConfig(gridType).getGridName() + " ("; + + if (!ignite.name().startsWith(prefix)) { + int stoppedCnt = 0; + + for (Ignite g : G.allGrids()) + if (g.name().startsWith(prefix)) { + try { + log.info(">>> Grid stopping [nodeId=" + g.cluster().localNode().id() + + ", name='" + g.name() + "']"); + + G.stop(g.name(), true); + + stoppedCnt++; + } + catch (IllegalStateException e) { + log.warning("Failed to stop grid.", e); + } + } + + return stoppedCnt; + } + + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java index b134591,0000000..73a3076 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java @@@ -1,275 -1,0 +1,281 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.marshaller.*; +import org.apache.ignite.internal.client.marshaller.optimized.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.protocols.tcp.*; +import org.apache.ignite.internal.util.nio.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class ClientTestRestServer { + /** */ + public static final int FIRST_SERVER_PORT = 11000; + + /** */ + public static final int SERVERS_CNT = 5; + + /** */ + private static final byte[] EMPTY_SES_TOKEN = new byte[] {}; + + /** */ + private static final Collection<GridClientNodeBean> top = new ArrayList<>(); + + /** + * + */ + static { + for (int port = FIRST_SERVER_PORT; port < FIRST_SERVER_PORT + SERVERS_CNT; port++) { + GridClientNodeBean node = new GridClientNodeBean(); + + node.setNodeId(UUID.randomUUID()); + node.setConsistentId("127.0.0.1:" + port); + node.setTcpPort(port); + node.setTcpAddresses(Arrays.asList("127.0.0.1")); + + top.add(node); + } + } + + /** */ + private final int port; + + /** */ + private volatile boolean failOnConnect; + + /** */ + private final IgniteLogger log; + + /** */ + private final AtomicInteger connCnt = new AtomicInteger(); + + /** */ + private final AtomicInteger succConnCnt = new AtomicInteger(); + + /** */ + private final AtomicInteger disconnCnt = new AtomicInteger(); + + /** */ + private GridNioServer<GridClientMessage> srv; + + /** */ + private volatile GridNioSession lastSes; + + /** + * @param port Port to listen on. + * @param failOnConnect If {@code true} than server will close connection immediately after connect. + * @param log Log. + */ + public ClientTestRestServer(int port, boolean failOnConnect, IgniteLogger log) { + this.port = port; + this.failOnConnect = failOnConnect; + this.log = log; + } + + /** + * @return Port number. + */ + public int getPort() { + return port; + } + + /** + * Starts the server. + * + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + try { + String gridName = "test"; + + srv = GridNioServer.<GridClientMessage>builder() + .address(InetAddress.getByName("127.0.0.1")) + .port(port) + .listener(new TestListener()) + .logger(log) + .selectorCount(2) + .gridName(gridName) + .byteOrder(ByteOrder.nativeOrder()) + .tcpNoDelay(true) + .directBuffer(false) + .filters( + new GridNioAsyncNotifyFilter(gridName, Executors.newFixedThreadPool(2), log), + new GridNioCodecFilter(new TestParser(), log, false) + ) + .build(); + } + catch (UnknownHostException e) { + throw new IgniteCheckedException("Failed to determine localhost address.", e); + } + + srv.start(); + } + + /** + * Stops the server. + */ + public void stop() { + assert srv != null; + + srv.stop(); + } + + /** + * @return Number of connections opened to this server. + */ + public int getConnectCount() { + return connCnt.get(); + } + + /** + * @return Number of successful connections opened to this server. + */ + public int getSuccessfulConnectCount() { + return succConnCnt.get(); + } + + /** + * @return Number of connections with this server closed by clients. + */ + public int getDisconnectCount() { + return disconnCnt.get(); + } + + /** + * Closes all opened connections. + */ + public void fail() { + assert lastSes != null; + + lastSes.close(); + + failOnConnect = true; + + resetCounters(); + } + + /** + * + */ + public void repair() { + failOnConnect = false; + } + + /** + * Resets all counters. + */ + public void resetCounters() { + connCnt.set(0); + succConnCnt.set(0); + disconnCnt.set(0); + } + + /** + * Prepares response stub. + * @param msg Mesage to respond to. + * @return Response. + */ + private static GridClientResponse makeResponseFor(GridClientMessage msg) { + GridClientResponse res = new GridClientResponse(); + + res.clientId(msg.clientId()); + res.requestId(msg.requestId()); + res.successStatus(GridClientResponse.STATUS_SUCCESS); + res.sessionToken(EMPTY_SES_TOKEN); + + return res; + } + + /** + * Test listener. + */ + private class TestListener extends GridNioServerListenerAdapter<GridClientMessage> { + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + lastSes = ses; + + connCnt.incrementAndGet(); + + if (failOnConnect) + ses.close(); + else + succConnCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + disconnCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, GridClientMessage msg) { + if (msg == GridClientPingPacket.PING_MESSAGE) + ses.send(GridClientPingPacket.PING_MESSAGE); + else if (msg instanceof GridClientAuthenticationRequest) + ses.send(makeResponseFor(msg)); + else if (msg instanceof GridClientTopologyRequest) { + GridClientResponse res = makeResponseFor(msg); + + res.result(top); + + ses.send(res); + } + else if (msg instanceof GridClientHandshakeRequest) + ses.send(GridClientHandshakeResponse.OK); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + ses.close(); + } + } + + /** + */ + private static class TestParser extends GridTcpRestParser { + /** */ + private final GridClientMarshaller marsh = new GridClientOptimizedMarshaller(); + ++ /** ++ */ ++ public TestParser() { ++ super(false); ++ } ++ + /** {@inheritDoc} */ + @Override protected GridClientMarshaller marshaller(GridNioSession ses) { + return marsh; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java index 7d0bcf4,0000000..9db1b9e mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java @@@ -1,289 -1,0 +1,288 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests topology caching. + */ +public class ClientTopologyCacheSelfTest extends GridCommonAbstractTest { + static { + // Override default port. + System.setProperty(IGNITE_JETTY_PORT, Integer.toString(8081)); + } + + /** Host. */ + public static final String HOST = "127.0.0.1"; + + /** Port. */ + public static final int BINARY_PORT = 11212; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testTopologyCache() throws Exception { + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false,// metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + false, // metricsCache + false, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + false, // metricsAfterRefresh + false);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + false, // attrsCache + false, // autoFetchMetrics + false, // autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + true, // metricsAfterRefresh + false);// attrsAfterRefresh + + testTopologyCache( + false, // metricsCache + true, // attrsCache + false, // autoFetchMetrics + false, // autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + false, // metricsAfterRefresh + true); // attrsAfterRefresh + } + + public void testAutofetch() throws Exception { + testTopologyCache( + true, // metricsCache + true, // attrsCache + true, // autoFetchMetrics + true, // autoFetchAttrs + true, // metricsBeforeRefresh + true, // attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + true, // autoFetchAttrs + false,// metricsBeforeRefresh + true, // attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + true, // autoFetchMetrics + false,// autoFetchAttrs + true, // metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false,// metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + } + + /** + * Starts new client with the given caching configuration and refreshes topology, + * Checks node metrics and attributes availability according to the given flags + * before and after refresh. + * + * @param metricsCache Should metrics be cached? + * @param attrsCache Should attributes be cached? + * @param autoFetchMetrics Should metrics be fetched automatically? + * @param autoFetchAttrs Should attributes be fetched automatically? + * @param metricsBeforeRefresh Should metrics be available before topology refresh? + * @param attrsBeforeRefresh Should attributes be available before topology refresh? + * @param metricsAfterRefresh Should metrics be available after topology refresh? + * @param attrsAfterRefresh Should attributes be available after topology refresh? + * @throws Exception If failed. + */ + private void testTopologyCache(boolean metricsCache, boolean attrsCache, + boolean autoFetchMetrics, boolean autoFetchAttrs, + boolean metricsBeforeRefresh, boolean attrsBeforeRefresh, + boolean metricsAfterRefresh, boolean attrsAfterRefresh) throws Exception { + GridClient client = client(metricsCache, attrsCache, autoFetchMetrics, autoFetchAttrs); + + try { + // Exclude cache metrics because there is no background refresh for them. + assertEquals(metricsBeforeRefresh, metricsAvailable(client, false)); + assertEquals(attrsBeforeRefresh, attrsAvailable(client)); + + client.compute().refreshTopology(true, true); + client.data(CACHE_NAME).metrics(); + + assertEquals(metricsAfterRefresh, metricsAvailable(client, true)); + assertEquals(attrsAfterRefresh, attrsAvailable(client)); + } + finally { + GridClientFactory.stop(client.id(), false); + } + } + + /** + * @param client Client instance. + * @param includeCache If {@code true} then cache metrics should be considered + * and their consistency with node metrics should be asserted, otherwise consider only node metrics. + * @return {@code true} if node metrics available through this client, + * {@code false} otherwise. + * @throws GridClientException If data projection is not available. + */ + private boolean metricsAvailable(GridClient client, boolean includeCache) throws GridClientException { + if (includeCache) { + boolean node = nodeMetricsAvailable(client); + boolean cache = client.data(CACHE_NAME).cachedMetrics() != null; + + assertTrue("Inconsistency between cache and node metrics cache.", node == cache); + + return node && cache; + } + else + return nodeMetricsAvailable(client); + } + + /** + * @param client Client instance. + * @return {@code true} if node node metrics available through this client, + * {@code false} otherwise. + */ + private boolean nodeMetricsAvailable(GridClient client) throws GridClientException { + for (GridClientNode node : client.compute().nodes()) + if (node.metrics() != null) + return true; + + return false; + } + + /** + * @param client Client instance. + * @return {@code true} if node attributes available through this client, + * {@code false} otherwise. + */ + private boolean attrsAvailable(GridClient client) throws GridClientException { + for (GridClientNode node : client.compute().nodes()) + if (node.attributes() != null && !node.attributes().isEmpty()) + return true; + + return false; + } + + /** + * @param metricsCache Should metrics cache be enabled? + * @param attrsCache Should attributes cache be enabled? + * @return Client. + * @throws GridClientException In case of error. + */ + private GridClient client(boolean metricsCache, boolean attrsCache, + boolean autoFetchMetrics, boolean autoFetchAttrs) throws GridClientException { + GridClientDataConfiguration cache = new GridClientDataConfiguration(); + + cache.setName(CACHE_NAME); + + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList(HOST + ":" + BINARY_PORT)); + cfg.setEnableMetricsCache(metricsCache); + cfg.setEnableAttributesCache(attrsCache); + cfg.setAutoFetchMetrics(autoFetchMetrics); + cfg.setAutoFetchAttributes(autoFetchAttrs); + cfg.setDataConfigurations(Collections.singleton(cache)); + + return GridClientFactory.start(cfg); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(LOCAL); + cacheCfg.setName(CACHE_NAME); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setSwapEnabled(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getConnectorConfiguration() == null; + + ConnectorConfiguration clientCfg = new ConnectorConfiguration(); + + clientCfg.setPort(BINARY_PORT); + + cfg.setConnectorConfiguration(clientCfg); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setDiscoverySpi(disco); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java index 8353f4c,0000000..6e2a1eb mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java @@@ -1,232 -1,0 +1,232 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.commons.io.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.springframework.context.support.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.client.GridClientConfiguration.*; + +/** + * Properties-based configuration self test. + */ +public class ClientPropertiesConfigurationSelfTest extends GridCommonAbstractTest { + /** + * Grid client spring configuration. + */ + private static final URL GRID_CLIENT_SPRING_CONFIG; + + /** + * Grid client properties-based configuration. + */ + private static final URL GRID_CLIENT_CONFIG; + + /** + * + */ + static { + GRID_CLIENT_SPRING_CONFIG = - U.resolveGridGainUrl("/modules/clients/config/grid-client-spring-config.xml"); ++ U.resolveIgniteUrl("/modules/clients/config/grid-client-spring-config.xml"); + - GRID_CLIENT_CONFIG = U.resolveGridGainUrl("/modules/clients/config/grid-client-config.properties"); ++ GRID_CLIENT_CONFIG = U.resolveIgniteUrl("/modules/clients/config/grid-client-config.properties"); + } + + /** + * Test client configuration loaded from the properties. + * + * @throws Exception In case of exception. + */ + public void testCreation() throws Exception { + // Validate default configuration. + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList("localhost:11211")); + + validateConfig(0, cfg); + + // Validate default properties-based configuration. + cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList("localhost:11211")); + + validateConfig(0, cfg); + + // Validate loaded configuration. + Properties props = loadProperties(1, GRID_CLIENT_CONFIG); + validateConfig(0, new GridClientConfiguration(props)); + + // Validate loaded configuration with changed key prefixes. + Properties props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put("new." + e.getKey(), e.getValue()); + + validateConfig(0, new GridClientConfiguration("new.gg.client", props2)); + validateConfig(0, new GridClientConfiguration("new.gg.client.", props2)); + + // Validate loaded test configuration. + File tmp = uncommentProperties(GRID_CLIENT_CONFIG); + + props = loadProperties(25, tmp.toURI().toURL()); + validateConfig(2, new GridClientConfiguration(props)); + + // Validate loaded test configuration with changed key prefixes. + props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put("new." + e.getKey(), e.getValue()); + + validateConfig(2, new GridClientConfiguration("new.gg.client", props2)); + validateConfig(2, new GridClientConfiguration("new.gg.client.", props2)); + + // Validate loaded test configuration with empty key prefixes. + props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put(e.getKey().toString().replace("gg.client.", ""), e.getValue()); + + validateConfig(2, new GridClientConfiguration("", props2)); + validateConfig(2, new GridClientConfiguration(".", props2)); + } + + /** + * Validate spring client configuration. + * + * @throws Exception In case of any exception. + */ + public void testSpringConfig() throws Exception { + GridClientConfiguration cfg = new FileSystemXmlApplicationContext( + GRID_CLIENT_SPRING_CONFIG.toString()).getBean(GridClientConfiguration.class); + + assertEquals(Arrays.asList("127.0.0.1:11211"), new ArrayList<>(cfg.getServers())); + assertNull(cfg.getSecurityCredentialsProvider()); + + Collection<GridClientDataConfiguration> dataCfgs = cfg.getDataConfigurations(); + + assertEquals(1, dataCfgs.size()); + + GridClientDataConfiguration dataCfg = dataCfgs.iterator().next(); + + assertEquals("partitioned", dataCfg.getName()); + + assertNotNull(dataCfg.getPinnedBalancer()); + assertEquals(GridClientRandomBalancer.class, dataCfg.getPinnedBalancer().getClass()); + + assertNotNull(dataCfg.getAffinity()); + assertEquals(GridClientPartitionAffinity.class, dataCfg.getAffinity().getClass()); + } + + /** + * Uncomment properties. + * + * @param url Source to uncomment client properties for. + * @return Temporary file with uncommented client properties. + * @throws IOException In case of IO exception. + */ + private File uncommentProperties(URL url) throws IOException { + InputStream in = url.openStream(); + + assertNotNull(in); + + LineIterator it = IOUtils.lineIterator(in, "UTF-8"); + Collection<String> lines = new ArrayList<>(); + + while (it.hasNext()) + lines.add(it.nextLine().replace("#gg.client.", "gg.client.")); + + IgniteUtils.closeQuiet(in); + + File tmp = File.createTempFile(UUID.randomUUID().toString(), "properties"); + + tmp.deleteOnExit(); + + FileUtils.writeLines(tmp, lines); + + return tmp; + } + + /** + * Load properties from the url. + * + * @param expLoaded Expected number of loaded properties. + * @param url URL to load properties from. + * @return Loaded properties. + * @throws IOException In case of IO exception. + */ + private Properties loadProperties(int expLoaded, URL url) throws IOException { + InputStream in = url.openStream(); + + Properties props = new Properties(); + + assertEquals(0, props.size()); + + props.load(in); + + assertEquals(expLoaded, props.size()); + + IgniteUtils.closeQuiet(in); + + return props; + } + + /** + * Validate loaded configuration. + * + * @param expDataCfgs Expected data configurations count. + * @param cfg Client configuration to validate. + */ + private void validateConfig(int expDataCfgs, GridClientConfiguration cfg) { + assertEquals(GridClientRandomBalancer.class, cfg.getBalancer().getClass()); + assertEquals(10000, cfg.getConnectTimeout()); + assertEquals(null, cfg.getSecurityCredentialsProvider()); + + assertEquals(expDataCfgs, cfg.getDataConfigurations().size()); + + if (expDataCfgs == 2) { + GridClientDataConfiguration nullCfg = cfg.getDataConfiguration(null); + + assertEquals(null, nullCfg.getName()); + assertEquals(null, nullCfg.getAffinity()); + assertEquals(GridClientRandomBalancer.class, nullCfg.getPinnedBalancer().getClass()); + + GridClientDataConfiguration partCfg = cfg.getDataConfiguration("partitioned"); + + assertEquals("partitioned", partCfg.getName()); + assertEquals(GridClientPartitionAffinity.class, partCfg.getAffinity().getClass()); + assertEquals(GridClientRoundRobinBalancer.class, partCfg.getPinnedBalancer().getClass()); + } + + assertEquals(DFLT_MAX_CONN_IDLE_TIME, cfg.getMaxConnectionIdleTime()); + assertEquals(GridClientProtocol.TCP, cfg.getProtocol()); + assertEquals(Arrays.asList("localhost:11211"), new ArrayList<>(cfg.getServers())); + assertEquals(true, cfg.isEnableAttributesCache()); + assertEquals(true, cfg.isEnableMetricsCache()); + assertEquals(true, cfg.isTcpNoDelay()); + assertEquals(null, cfg.getSslContextFactory(), null); + assertEquals(DFLT_TOP_REFRESH_FREQ, cfg.getTopologyRefreshFrequency()); + } +}