http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java new file mode 100644 index 0000000..b134591 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestRestServer.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.marshaller.*; +import org.apache.ignite.internal.client.marshaller.optimized.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.protocols.tcp.*; +import org.apache.ignite.internal.util.nio.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class ClientTestRestServer { + /** */ + public static final int FIRST_SERVER_PORT = 11000; + + /** */ + public static final int SERVERS_CNT = 5; + + /** */ + private static final byte[] EMPTY_SES_TOKEN = new byte[] {}; + + /** */ + private static final Collection<GridClientNodeBean> top = new ArrayList<>(); + + /** + * + */ + static { + for (int port = FIRST_SERVER_PORT; port < FIRST_SERVER_PORT + SERVERS_CNT; port++) { + GridClientNodeBean node = new GridClientNodeBean(); + + node.setNodeId(UUID.randomUUID()); + node.setConsistentId("127.0.0.1:" + port); + node.setTcpPort(port); + node.setTcpAddresses(Arrays.asList("127.0.0.1")); + + top.add(node); + } + } + + /** */ + private final int port; + + /** */ + private volatile boolean failOnConnect; + + /** */ + private final IgniteLogger log; + + /** */ + private final AtomicInteger connCnt = new AtomicInteger(); + + /** */ + private final AtomicInteger succConnCnt = new AtomicInteger(); + + /** */ + private final AtomicInteger disconnCnt = new AtomicInteger(); + + /** */ + private GridNioServer<GridClientMessage> srv; + + /** */ + private volatile GridNioSession lastSes; + + /** + * @param port Port to listen on. + * @param failOnConnect If {@code true} than server will close connection immediately after connect. + * @param log Log. + */ + public ClientTestRestServer(int port, boolean failOnConnect, IgniteLogger log) { + this.port = port; + this.failOnConnect = failOnConnect; + this.log = log; + } + + /** + * @return Port number. + */ + public int getPort() { + return port; + } + + /** + * Starts the server. + * + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + try { + String gridName = "test"; + + srv = GridNioServer.<GridClientMessage>builder() + .address(InetAddress.getByName("127.0.0.1")) + .port(port) + .listener(new TestListener()) + .logger(log) + .selectorCount(2) + .gridName(gridName) + .byteOrder(ByteOrder.nativeOrder()) + .tcpNoDelay(true) + .directBuffer(false) + .filters( + new GridNioAsyncNotifyFilter(gridName, Executors.newFixedThreadPool(2), log), + new GridNioCodecFilter(new TestParser(), log, false) + ) + .build(); + } + catch (UnknownHostException e) { + throw new IgniteCheckedException("Failed to determine localhost address.", e); + } + + srv.start(); + } + + /** + * Stops the server. + */ + public void stop() { + assert srv != null; + + srv.stop(); + } + + /** + * @return Number of connections opened to this server. + */ + public int getConnectCount() { + return connCnt.get(); + } + + /** + * @return Number of successful connections opened to this server. + */ + public int getSuccessfulConnectCount() { + return succConnCnt.get(); + } + + /** + * @return Number of connections with this server closed by clients. + */ + public int getDisconnectCount() { + return disconnCnt.get(); + } + + /** + * Closes all opened connections. + */ + public void fail() { + assert lastSes != null; + + lastSes.close(); + + failOnConnect = true; + + resetCounters(); + } + + /** + * + */ + public void repair() { + failOnConnect = false; + } + + /** + * Resets all counters. + */ + public void resetCounters() { + connCnt.set(0); + succConnCnt.set(0); + disconnCnt.set(0); + } + + /** + * Prepares response stub. + * @param msg Mesage to respond to. + * @return Response. + */ + private static GridClientResponse makeResponseFor(GridClientMessage msg) { + GridClientResponse res = new GridClientResponse(); + + res.clientId(msg.clientId()); + res.requestId(msg.requestId()); + res.successStatus(GridClientResponse.STATUS_SUCCESS); + res.sessionToken(EMPTY_SES_TOKEN); + + return res; + } + + /** + * Test listener. + */ + private class TestListener extends GridNioServerListenerAdapter<GridClientMessage> { + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + lastSes = ses; + + connCnt.incrementAndGet(); + + if (failOnConnect) + ses.close(); + else + succConnCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + disconnCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, GridClientMessage msg) { + if (msg == GridClientPingPacket.PING_MESSAGE) + ses.send(GridClientPingPacket.PING_MESSAGE); + else if (msg instanceof GridClientAuthenticationRequest) + ses.send(makeResponseFor(msg)); + else if (msg instanceof GridClientTopologyRequest) { + GridClientResponse res = makeResponseFor(msg); + + res.result(top); + + ses.send(res); + } + else if (msg instanceof GridClientHandshakeRequest) + ses.send(GridClientHandshakeResponse.OK); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + ses.close(); + } + } + + /** + */ + private static class TestParser extends GridTcpRestParser { + /** */ + private final GridClientMarshaller marsh = new GridClientOptimizedMarshaller(); + + /** {@inheritDoc} */ + @Override protected GridClientMarshaller marshaller(GridNioSession ses) { + return marsh; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java new file mode 100644 index 0000000..6821209 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTopologyCacheSelfTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.cache.*; +import org.apache.ignite.client.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests topology caching. + */ +public class ClientTopologyCacheSelfTest extends GridCommonAbstractTest { + static { + // Override default port. + System.setProperty(IGNITE_JETTY_PORT, Integer.toString(8081)); + } + + /** Host. */ + public static final String HOST = "127.0.0.1"; + + /** Port. */ + public static final int BINARY_PORT = 11212; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testTopologyCache() throws Exception { + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false,// metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + false, // metricsCache + false, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + false, // metricsAfterRefresh + false);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + false, // attrsCache + false, // autoFetchMetrics + false, // autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + true, // metricsAfterRefresh + false);// attrsAfterRefresh + + testTopologyCache( + false, // metricsCache + true, // attrsCache + false, // autoFetchMetrics + false, // autoFetchAttrs + false, // metricsBeforeRefresh + false, // attrsBeforeRefresh + false, // metricsAfterRefresh + true); // attrsAfterRefresh + } + + public void testAutofetch() throws Exception { + testTopologyCache( + true, // metricsCache + true, // attrsCache + true, // autoFetchMetrics + true, // autoFetchAttrs + true, // metricsBeforeRefresh + true, // attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + true, // autoFetchAttrs + false,// metricsBeforeRefresh + true, // attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + true, // autoFetchMetrics + false,// autoFetchAttrs + true, // metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + + testTopologyCache( + true, // metricsCache + true, // attrsCache + false,// autoFetchMetrics + false,// autoFetchAttrs + false,// metricsBeforeRefresh + false,// attrsBeforeRefresh + true, // metricsAfterRefresh + true);// attrsAfterRefresh + } + + /** + * Starts new client with the given caching configuration and refreshes topology, + * Checks node metrics and attributes availability according to the given flags + * before and after refresh. + * + * @param metricsCache Should metrics be cached? + * @param attrsCache Should attributes be cached? + * @param autoFetchMetrics Should metrics be fetched automatically? + * @param autoFetchAttrs Should attributes be fetched automatically? + * @param metricsBeforeRefresh Should metrics be available before topology refresh? + * @param attrsBeforeRefresh Should attributes be available before topology refresh? + * @param metricsAfterRefresh Should metrics be available after topology refresh? + * @param attrsAfterRefresh Should attributes be available after topology refresh? + * @throws Exception If failed. + */ + private void testTopologyCache(boolean metricsCache, boolean attrsCache, + boolean autoFetchMetrics, boolean autoFetchAttrs, + boolean metricsBeforeRefresh, boolean attrsBeforeRefresh, + boolean metricsAfterRefresh, boolean attrsAfterRefresh) throws Exception { + GridClient client = client(metricsCache, attrsCache, autoFetchMetrics, autoFetchAttrs); + + try { + // Exclude cache metrics because there is no background refresh for them. + assertEquals(metricsBeforeRefresh, metricsAvailable(client, false)); + assertEquals(attrsBeforeRefresh, attrsAvailable(client)); + + client.compute().refreshTopology(true, true); + client.data(CACHE_NAME).metrics(); + + assertEquals(metricsAfterRefresh, metricsAvailable(client, true)); + assertEquals(attrsAfterRefresh, attrsAvailable(client)); + } + finally { + GridClientFactory.stop(client.id(), false); + } + } + + /** + * @param client Client instance. + * @param includeCache If {@code true} then cache metrics should be considered + * and their consistency with node metrics should be asserted, otherwise consider only node metrics. + * @return {@code true} if node metrics available through this client, + * {@code false} otherwise. + * @throws GridClientException If data projection is not available. + */ + private boolean metricsAvailable(GridClient client, boolean includeCache) throws GridClientException { + if (includeCache) { + boolean node = nodeMetricsAvailable(client); + boolean cache = client.data(CACHE_NAME).cachedMetrics() != null; + + assertTrue("Inconsistency between cache and node metrics cache.", node == cache); + + return node && cache; + } + else + return nodeMetricsAvailable(client); + } + + /** + * @param client Client instance. + * @return {@code true} if node node metrics available through this client, + * {@code false} otherwise. + */ + private boolean nodeMetricsAvailable(GridClient client) throws GridClientException { + for (GridClientNode node : client.compute().nodes()) + if (node.metrics() != null) + return true; + + return false; + } + + /** + * @param client Client instance. + * @return {@code true} if node attributes available through this client, + * {@code false} otherwise. + */ + private boolean attrsAvailable(GridClient client) throws GridClientException { + for (GridClientNode node : client.compute().nodes()) + if (node.attributes() != null && !node.attributes().isEmpty()) + return true; + + return false; + } + + /** + * @param metricsCache Should metrics cache be enabled? + * @param attrsCache Should attributes cache be enabled? + * @return Client. + * @throws GridClientException In case of error. + */ + private GridClient client(boolean metricsCache, boolean attrsCache, + boolean autoFetchMetrics, boolean autoFetchAttrs) throws GridClientException { + GridClientDataConfiguration cache = new GridClientDataConfiguration(); + + cache.setName(CACHE_NAME); + + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList(HOST + ":" + BINARY_PORT)); + cfg.setEnableMetricsCache(metricsCache); + cfg.setEnableAttributesCache(attrsCache); + cfg.setAutoFetchMetrics(autoFetchMetrics); + cfg.setAutoFetchAttributes(autoFetchAttrs); + cfg.setDataConfigurations(Collections.singleton(cache)); + + return GridClientFactory.start(cfg); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(LOCAL); + cacheCfg.setName(CACHE_NAME); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setSwapEnabled(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(BINARY_PORT); + + cfg.setClientConnectionConfiguration(clientCfg); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setDiscoverySpi(disco); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/HashMapStore.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/HashMapStore.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/HashMapStore.java new file mode 100644 index 0000000..8de945a --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/HashMapStore.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.lang.*; + +import javax.cache.*; +import java.util.*; + +/** + * Simple HashMap based cache store emulation. + */ +public class HashMapStore extends CacheStoreAdapter { + /** Map for cache store. */ + private final Map<Object, Object> map = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure c, Object... args) { + for (Map.Entry e : map.entrySet()) + c.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java new file mode 100644 index 0000000..ace975e --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task, that sleeps for 10 seconds in split and returns + * the length of an argument. + */ +public class SleepTestTask extends ComputeTaskSplitAdapter<String, Integer> { + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, String arg) + throws IgniteCheckedException { + return Collections.singleton(new ComputeJobAdapter(arg) { + @Override public Object execute() { + try { + Thread.sleep(10000); + + String val = argument(0); + + return val == null ? 0 : val.length(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java new file mode 100644 index 0000000..3289e8c --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +/** + * Adapter for {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} + * overriding {@code split(...)} method to return singleton with self instance. + * This adapter should be used for tasks that always splits to a single task. + * @param <T> Type of the task execution argument. + * @param <R> Type of the task result returning from {@link org.apache.ignite.compute.ComputeTask#reduce(List)} method. + */ +public abstract class TaskSingleJobSplitAdapter<T, R> extends ComputeTaskSplitAdapter<T, R> { + /** Empty constructor. */ + protected TaskSingleJobSplitAdapter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(final int gridSize, final T arg) throws IgniteCheckedException { + return Collections.singleton(new ComputeJobAdapter() { + @Override public Object execute() throws IgniteCheckedException { + return executeJob(gridSize, arg); + } + }); + } + + /** {@inheritDoc} */ + @Override public R reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert results.size() == 1; + + ComputeJobResult res = results.get(0); + + if (res.isCancelled()) + throw new IgniteCheckedException("Reduce receives failed job."); + + return res.getData(); + } + + /** + * Executes this task's job. + * + * @param gridSize Number of available grid nodes. Note that returned number of + * jobs can be less, equal or greater than this grid size. + * @param arg Task execution argument. Can be {@code null}. + * @return Job execution result (possibly {@code null}). This result will be returned + * in {@link org.apache.ignite.compute.ComputeJobResult#getData()} method passed into + * {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} method into task on caller node. + * @throws IgniteCheckedException If job execution caused an exception. This exception will be + * returned in {@link org.apache.ignite.compute.ComputeJobResult#getException()} method passed into + * {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} method into task on caller node. + * If execution produces a {@link RuntimeException} or {@link Error}, then + * it will be wrapped into {@link IgniteCheckedException}. + */ + protected abstract Object executeJob(int gridSize, T arg) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientCacheFlagsCodecTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientCacheFlagsCodecTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientCacheFlagsCodecTest.java new file mode 100644 index 0000000..8a2ff25 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientCacheFlagsCodecTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import junit.framework.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.rest.handlers.cache.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.internal.client.GridClientCacheFlag.*; + +/** + * Tests conversions between GridClientCacheFlag and CacheFlag. + */ +public class ClientCacheFlagsCodecTest extends TestCase { + /** + * Tests that each client flag will be correctly converted to server flag. + */ + public void testEncodingDecodingFullness() { + for (GridClientCacheFlag f : GridClientCacheFlag.values()) { + if (f == KEEP_PORTABLES) + continue; + + int bits = GridClientConnection.encodeCacheFlags(Collections.singleton(f)); + + assertTrue(bits != 0); + + CacheFlag[] out = GridCacheCommandHandler.parseCacheFlags(bits); + assertEquals(1, out.length); + + assertEquals(f.name(), out[0].name()); + } + } + + /** + * Tests that groups of client flags can be correctly converted to corresponding server flag groups. + */ + public void testGroupEncodingDecoding() { + // all + doTestGroup(GridClientCacheFlag.values()); + // none + doTestGroup(); + // some + doTestGroup(GridClientCacheFlag.INVALIDATE); + } + + /** + * @param flags Client flags to be encoded, decoded and checked. + */ + private void doTestGroup(GridClientCacheFlag... flags) { + EnumSet<GridClientCacheFlag> flagSet = F.isEmpty(flags) ? EnumSet.noneOf(GridClientCacheFlag.class) : + EnumSet.copyOf(Arrays.asList(flags)); + + int bits = GridClientConnection.encodeCacheFlags(flagSet); + + CacheFlag[] out = GridCacheCommandHandler.parseCacheFlags(bits); + + assertEquals(flagSet.contains(KEEP_PORTABLES) ? flagSet.size() - 1 : flagSet.size(), out.length); + + for (CacheFlag f : out) { + assertTrue(flagSet.contains(GridClientCacheFlag.valueOf(f.name()))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientComputeImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientComputeImplSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientComputeImplSelfTest.java new file mode 100644 index 0000000..1383096 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientComputeImplSelfTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.testframework.GridTestUtils.*; + +/** + * Simple unit test for GridClientComputeImpl which checks method parameters. + * It tests only those methods that can produce assertion underneath upon incorrect arguments. + */ +public class ClientComputeImplSelfTest extends GridCommonAbstractTest { + /** Mocked client compute. */ + private GridClientCompute compute = allocateInstance0(GridClientComputeImpl.class); + + /** + * @throws Exception If failed. + */ + public void testProjection_byGridClientNode() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.projection((GridClientNode)null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: node"); + } + + /** + * @throws Exception If failed. + */ + public void testExecute() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.execute(null, null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: taskName"); + } + + /** + * @throws Exception If failed. + */ + public void testExecuteAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.executeAsync(null, null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: taskName"); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityExecute() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.affinityExecute(null, "cache", "key", null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: taskName"); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityExecuteAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.affinityExecute(null, "cache", "key", null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: taskName"); + } + + /** + * @throws Exception If failed. + */ + public void testNode() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.node(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: id"); + } + + /** + * @throws Exception If failed. + */ + public void testNodesByIds() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.nodes((Collection<UUID>)null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: ids"); + } + + /** + * @throws Exception If failed. + */ + public void testNodesByFilter() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.nodes((GridClientPredicate<GridClientNode>)null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: filter"); + } + + /** + * @throws Exception If failed. + */ + public void testRefreshNodeById() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.refreshNode((UUID)null, false, false); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: id"); + } + + /** + * @throws Exception If failed. + */ + public void testRefreshNodeByIdAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.refreshNodeAsync((UUID)null, false, false); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: id"); + } + + /** + * @throws Exception If failed. + */ + public void testRefreshNodeByIp() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.refreshNode((String)null, false, false); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: ip"); + } + + /** + * @throws Exception If failed. + */ + public void testRefreshNodeByIpAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return compute.refreshNode((String)null, false, false); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: ip"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientDataImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientDataImplSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientDataImplSelfTest.java new file mode 100644 index 0000000..6e2ca45 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientDataImplSelfTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.testframework.GridTestUtils.*; + +/** + * Simple unit test for GridClientDataImpl which checks method parameters. + */ +public class ClientDataImplSelfTest extends GridCommonAbstractTest { + /** Mocked client data. */ + private GridClientData data = allocateInstance0(GridClientDataImpl.class); + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.put(null, "val"); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.put("key", null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: val"); + } + + /** + * @throws Exception If failed. + */ + public void testPutAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.putAsync(null, "val"); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.putAsync("key", null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: val"); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.putAll(null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: entries"); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.putAllAsync(null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: entries"); + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.get(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testGetAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.getAsync(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testGetAll() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.getAll(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: keys"); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.getAllAsync(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: keys"); + } + + /** + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.remove(null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.removeAsync(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.removeAll(null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: keys"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAllAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.removeAllAsync(null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: keys"); + } + + /** + * @throws Exception If failed. + */ + public void testReplace() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.replace(null, "val"); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.replace("key", null); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: val"); + } + + /** + * @throws Exception If failed. + */ + public void testReplaceAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.replaceAsync(null, "val"); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.replaceAsync("key", null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: val"); + } + + /** + * @throws Exception If failed. + */ + public void testCas() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + data.cas(null, "val1", "val2"); + + return null; + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testCasAsync() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.casAsync(null, "val1", "val2"); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } + + /** + * @throws Exception If failed. + */ + public void testAffinity() throws Exception { + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return data.affinity(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: key"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientFutureAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientFutureAdapterSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientFutureAdapterSelfTest.java new file mode 100644 index 0000000..6c42e50 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientFutureAdapterSelfTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Grid client future implementation self test. + */ +public class ClientFutureAdapterSelfTest extends GridCommonAbstractTest { + /** + * Test finished futures. + */ + public void testFinished() { + GridClientFutureAdapter<Integer> fut = new GridClientFutureAdapter<>(); + + assertFalse(fut.isDone()); + + fut.onDone(0); + + assertTrue(fut.isDone()); + assertTrue(new GridClientFutureAdapter<>(0).isDone()); + assertTrue(new GridClientFutureAdapter<Integer>(new GridClientException("Test grid exception.")).isDone()); + assertTrue(new GridClientFutureAdapter<Integer>(new RuntimeException("Test runtime exception.")).isDone()); + } + + /** + * Test chained futures behaviour. + * + * @throws org.apache.ignite.internal.client.GridClientException On any exception. + */ + public void testChains() throws GridClientException { + // Synchronous notifications. + testChains(1, 100); + testChains(10, 10); + testChains(100, 1); + testChains(1000, 0); + } + + /** + * Test chained future in certain conditions. + * + * @param chainSize Futures chain size. + * @param waitDelay Delay to wait each future in the chain becomes done. + * @throws GridClientException In case of any exception + */ + private void testChains(int chainSize, long waitDelay) throws GridClientException { + /* Base future to chain from. */ + GridClientFutureAdapter<Integer> fut = new GridClientFutureAdapter<>(); + + /* Collection of chained futures: fut->chained[0]->chained[1]->...->chained[chainSize - 1] */ + List<GridClientFutureAdapter<Integer>> chained = new ArrayList<>(); + + GridClientFutureAdapter<Integer> cur = fut; + + for (int i = 0; i < chainSize; i++) { + cur = cur.chain(new GridClientFutureCallback<Integer, Integer>() { + @Override public Integer onComplete(GridClientFuture<Integer> f) throws GridClientException { + assertTrue("Expects callback future is finished.", f.isDone()); + + return f.get() + 1; + } + }); + + chained.add(cur); + } + + long start; + + /* Validate not-finished futures in chain. */ + for (GridClientFuture<Integer> f : chained) { + assertFalse(f.isDone()); + + start = System.currentTimeMillis(); + + try { + f.get(waitDelay, TimeUnit.MILLISECONDS); + + fail("Expects chained future not finished yet."); + } + catch (GridClientFutureTimeoutException ignore) { + /* No op: expects chained future not finished yet. */ + } + + assertTrue(System.currentTimeMillis() - start >= waitDelay); + } + + /* Calculate 'count' chained futures time consumption. */ + start = System.currentTimeMillis(); + + fut.onDone(0); + assertEquals("Check chain-based increment value.", chainSize, chained.get(chainSize - 1).get().intValue()); + + info("Time consumption for " + chainSize + " chained futures: " + (System.currentTimeMillis() - start)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java new file mode 100644 index 0000000..77a63e2 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; + +/** + * Client's partitioned affinity tests. + */ +public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { + /** Hash ID resolver. */ + private static final GridClientPartitionAffinity.HashIdResolver HASH_ID_RSLVR = + new GridClientPartitionAffinity.HashIdResolver() { + @Override public Object getHashId(GridClientNode node) { + return node.nodeId(); + } + }; + + /** + * Test predefined affinity - must be ported to other clients. + */ + @SuppressWarnings("UnaryPlus") + public void testPredefined() throws Exception { + // Use Md5 hasher for this test. + GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); + + getTestResources().inject(aff); + + aff.setHashIdResolver(HASH_ID_RSLVR); + + List<GridClientNode> nodes = new ArrayList<>(); + + nodes.add(createNode("000ea4cd-f449-4dcb-869a-5317c63bd619", 50)); + nodes.add(createNode("010ea4cd-f449-4dcb-869a-5317c63bd62a", 60)); + nodes.add(createNode("0209ec54-ff53-4fdb-8239-5a3ac1fb31bd", 70)); + nodes.add(createNode("0309ec54-ff53-4fdb-8239-5a3ac1fb31ef", 80)); + nodes.add(createNode("040c9b94-02ae-45a6-9d5c-a066dbdf2636", 90)); + nodes.add(createNode("050c9b94-02ae-45a6-9d5c-a066dbdf2747", 100)); + nodes.add(createNode("0601f916-4357-4cfe-a7df-49d4721690bf", 110)); + nodes.add(createNode("0702f916-4357-4cfe-a7df-49d4721691c0", 120)); + + Map<Object, Integer> data = new LinkedHashMap<>(); + + data.put("", 4); + data.put("asdf", 4); + data.put("224ea4cd-f449-4dcb-869a-5317c63bd619", 5); + data.put("fdc9ec54-ff53-4fdb-8239-5a3ac1fb31bd", 2); + data.put("0f9c9b94-02ae-45a6-9d5c-a066dbdf2636", 2); + data.put("d8f1f916-4357-4cfe-a7df-49d4721690bf", 7); + data.put("c77ffeae-78a1-4ee6-a0fd-8d197a794412", 3); + data.put("35de9f21-3c9b-4f4a-a7d5-3e2c6cb01564", 1); + data.put("d67eb652-4e76-47fb-ad4e-cd902d9b868a", 7); + + data.put(0, 4); + data.put(1, 7); + data.put(12, 5); + data.put(123, 6); + data.put(1234, 4); + data.put(12345, 6); + data.put(123456, 6); + data.put(1234567, 6); + data.put(12345678, 0); + data.put(123456789, 7); + data.put(1234567890, 7); + data.put(1234567890L, 7); + data.put(12345678901L, 2); + data.put(123456789012L, 1); + data.put(1234567890123L, 0); + data.put(12345678901234L, 1); + data.put(123456789012345L, 6); + data.put(1234567890123456L, 7); + data.put(-23456789012345L, 4); + data.put(-2345678901234L, 1); + data.put(-234567890123L, 5); + data.put(-23456789012L, 5); + data.put(-2345678901L, 7); + data.put(-234567890L, 4); + data.put(-234567890, 7); + data.put(-23456789, 7); + data.put(-2345678, 0); + data.put(-234567, 6); + data.put(-23456, 6); + data.put(-2345, 6); + data.put(-234, 7); + data.put(-23, 5); + data.put(-2, 4); + + data.put(0x80000000, 4); + data.put(0x7fffffff, 7); + data.put(0x8000000000000000L, 4); + data.put(0x7fffffffffffffffL, 4); + + data.put(+1.1, 3); + data.put(-10.01, 4); + data.put(+100.001, 4); + data.put(-1000.0001, 4); + data.put(+1.7976931348623157E+308, 6); + data.put(-1.7976931348623157E+308, 6); + data.put(+4.9E-324, 7); + data.put(-4.9E-324, 7); + + boolean ok = true; + + for (Map.Entry<Object, Integer> entry : data.entrySet()) { + UUID exp = nodes.get(entry.getValue()).nodeId(); + UUID act = aff.node(entry.getKey(), nodes).nodeId(); + + if (exp.equals(act)) + continue; + + ok = false; + + info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + + ", actual=" + act + "."); + } + + if (ok) + return; + + fail("Client partitioned affinity validation fails."); + } + + /** + * Test predefined affinity - must be ported to other clients. + */ + @SuppressWarnings("UnaryPlus") + public void testPredefinedHashIdResolver() throws Exception { + // Use Md5 hasher for this test. + GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); + + getTestResources().inject(aff); + + aff.setHashIdResolver(new GridClientPartitionAffinity.HashIdResolver() { + @Override public Object getHashId(GridClientNode node) { + return node.replicaCount(); + } + }); + + List<GridClientNode> nodes = new ArrayList<>(); + + nodes.add(createNode("000ea4cd-f449-4dcb-869a-5317c63bd619", 50)); + nodes.add(createNode("010ea4cd-f449-4dcb-869a-5317c63bd62a", 60)); + nodes.add(createNode("0209ec54-ff53-4fdb-8239-5a3ac1fb31bd", 70)); + nodes.add(createNode("0309ec54-ff53-4fdb-8239-5a3ac1fb31ef", 80)); + nodes.add(createNode("040c9b94-02ae-45a6-9d5c-a066dbdf2636", 90)); + nodes.add(createNode("050c9b94-02ae-45a6-9d5c-a066dbdf2747", 100)); + nodes.add(createNode("0601f916-4357-4cfe-a7df-49d4721690bf", 110)); + nodes.add(createNode("0702f916-4357-4cfe-a7df-49d4721691c0", 120)); + + Map<Object, Integer> data = new LinkedHashMap<>(); + + data.put("", 4); + data.put("asdf", 3); + data.put("224ea4cd-f449-4dcb-869a-5317c63bd619", 5); + data.put("fdc9ec54-ff53-4fdb-8239-5a3ac1fb31bd", 2); + data.put("0f9c9b94-02ae-45a6-9d5c-a066dbdf2636", 2); + data.put("d8f1f916-4357-4cfe-a7df-49d4721690bf", 4); + data.put("c77ffeae-78a1-4ee6-a0fd-8d197a794412", 3); + data.put("35de9f21-3c9b-4f4a-a7d5-3e2c6cb01564", 4); + data.put("d67eb652-4e76-47fb-ad4e-cd902d9b868a", 2); + + data.put(0, 4); + data.put(1, 1); + data.put(12, 7); + data.put(123, 1); + data.put(1234, 6); + data.put(12345, 2); + data.put(123456, 5); + data.put(1234567, 4); + data.put(12345678, 6); + data.put(123456789, 3); + data.put(1234567890, 3); + data.put(1234567890L, 3); + data.put(12345678901L, 0); + data.put(123456789012L, 1); + data.put(1234567890123L, 3); + data.put(12345678901234L, 5); + data.put(123456789012345L, 5); + data.put(1234567890123456L, 7); + data.put(-23456789012345L, 6); + data.put(-2345678901234L, 4); + data.put(-234567890123L, 3); + data.put(-23456789012L, 0); + data.put(-2345678901L, 4); + data.put(-234567890L, 5); + data.put(-234567890, 3); + data.put(-23456789, 3); + data.put(-2345678, 6); + data.put(-234567, 4); + data.put(-23456, 5); + data.put(-2345, 2); + data.put(-234, 7); + data.put(-23, 6); + data.put(-2, 6); + + data.put(0x80000000, 7); + data.put(0x7fffffff, 1); + data.put(0x8000000000000000L, 7); + data.put(0x7fffffffffffffffL, 7); + + data.put(+1.1, 2); + data.put(-10.01, 0); + data.put(+100.001, 2); + data.put(-1000.0001, 0); + data.put(+1.7976931348623157E+308, 6); + data.put(-1.7976931348623157E+308, 1); + data.put(+4.9E-324, 1); + data.put(-4.9E-324, 1); + + boolean ok = true; + + for (Map.Entry<Object, Integer> entry : data.entrySet()) { + UUID exp = nodes.get(entry.getValue()).nodeId(); + UUID act = aff.node(entry.getKey(), nodes).nodeId(); + + if (exp.equals(act)) + continue; + + ok = false; + + info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + + ", actual=" + act + "."); + } + + if (ok) + return; + + fail("Client partitioned affinity validation fails."); + } + + /** + * Create node with specified node id and replica count. + * + * @param nodeId Node id. + * @param replicaCnt Node partitioned affinity replica count. + * @return New node with specified node id and replica count. + */ + private GridClientNode createNode(String nodeId, int replicaCnt) { + return GridClientNodeImpl.builder() + .nodeId(UUID.fromString(nodeId)) + .replicaCount(replicaCnt) + .build(); + } + + /** + * Validate client partitioned affinity and cache partitioned affinity produce the same result. + * + * @throws Exception On any exception. + */ + public void testReplicas() throws Exception { + // Emulate nodes in topology. + Collection<GridClientNode> nodes = new ArrayList<>(); + Collection<ClusterNode> srvNodes = new ArrayList<>(); + + // Define affinities to test. + GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); + + getTestResources().inject(aff); + + aff.setHashIdResolver(HASH_ID_RSLVR); + + CacheConsistentHashAffinityFunction srvAff = new CacheConsistentHashAffinityFunction(); + + getTestResources().inject(srvAff); + + srvAff.setHashIdResolver(new CacheAffinityNodeIdHashResolver()); + + // Define keys to test affinity for. + Collection<String> keys = new ArrayList<>( + Arrays.asList("", "1", "12", "asdf", "Hadoop\u3092\u6bba\u3059")); + + for (int i = 0; i < 10; i++) + keys.add(UUID.randomUUID().toString()); + + // Test affinity behaviour on different topologies. + for (int i = 0; i < 20; i++) { + addNodes(1 + (int)Math.round(Math.random() * 50), nodes, srvNodes); + + for (String key : keys) + assertSameAffinity(key, aff, srvAff, nodes, srvNodes); + } + } + + /** + * Add {@code cnt} nodes into emulated topology. + * + * @param cnt Number of nodes to add into emulated topology. + * @param nodes Client topology. + * @param srvNodes Server topology. + */ + private void addNodes(int cnt, Collection<GridClientNode> nodes, Collection<ClusterNode> srvNodes) { + while (cnt-- > 0) { + UUID nodeId = UUID.randomUUID(); + int replicaCnt = (int)Math.round(Math.random() * 500) + 1; + + nodes.add(GridClientNodeImpl.builder() + .nodeId(nodeId) + .replicaCount(replicaCnt) + .build()); + + ClusterNode srvNode = new TestRichNode(nodeId, replicaCnt); + + srvNodes.add(srvNode); + } + } + + /** + * Compare server and client affinity for specified key in current topology. + * + * @param key Key to validate affinity for. + * @param aff Client affinity. + * @param srvAff Server affinity. + * @param nodes Client topology. + * @param srvNodes Server topology. + */ + private void assertSameAffinity(Object key, GridClientDataAffinity aff, CacheAffinityFunction srvAff, + Collection<? extends GridClientNode> nodes, Collection<ClusterNode> srvNodes) { + GridClientNode node = aff.node(key, nodes); + int part = srvAff.partition(key); + + CacheAffinityFunctionContext ctx = new GridCacheAffinityFunctionContextImpl(new ArrayList<>(srvNodes), + null, null, 1, 0); + + ClusterNode srvNode = F.first(srvAff.assignPartitions(ctx).get(part)); + + if (node == null) + assertNull(srvNode); + else { + assertNotNull(srvNode); + assertEquals(node.nodeId(), srvNode.id()); + } + } + + /** + * Rich node stub to use in emulated server topology. + */ + private static class TestRichNode extends GridTestNode { + /** + * Node id. + */ + private final UUID nodeId; + + /** + * Partitioned affinity replicas count. + */ + private final Integer replicaCnt; + + /** + * Externalizable class requires public no-arg constructor. + */ + @SuppressWarnings("UnusedDeclaration") + public TestRichNode() { + this(UUID.randomUUID(), DFLT_REPLICA_COUNT); + } + + /** + * Constructs rich node stub to use in emulated server topology. + * + * @param nodeId Node id. + * @param replicaCnt Partitioned affinity replicas count. + */ + private TestRichNode(UUID nodeId, int replicaCnt) { + this.nodeId = nodeId; + this.replicaCnt = replicaCnt; + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public <T> T attribute(String name) { + if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) + return (T)replicaCnt; + + return super.attribute(name); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java new file mode 100644 index 0000000..4b6c1ca --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPropertiesConfigurationSelfTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.commons.io.*; +import org.apache.ignite.client.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.springframework.context.support.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.client.GridClientConfiguration.*; + +/** + * Properties-based configuration self test. + */ +public class ClientPropertiesConfigurationSelfTest extends GridCommonAbstractTest { + /** + * Grid client spring configuration. + */ + private static final URL GRID_CLIENT_SPRING_CONFIG; + + /** + * Grid client properties-based configuration. + */ + private static final URL GRID_CLIENT_CONFIG; + + /** + * + */ + static { + GRID_CLIENT_SPRING_CONFIG = + U.resolveGridGainUrl("/modules/clients/config/grid-client-spring-config.xml"); + + GRID_CLIENT_CONFIG = U.resolveGridGainUrl("/modules/clients/config/grid-client-config.properties"); + } + + /** + * Test client configuration loaded from the properties. + * + * @throws Exception In case of exception. + */ + public void testCreation() throws Exception { + // Validate default configuration. + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList("localhost:11211")); + + validateConfig(0, cfg); + + // Validate default properties-based configuration. + cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList("localhost:11211")); + + validateConfig(0, cfg); + + // Validate loaded configuration. + Properties props = loadProperties(1, GRID_CLIENT_CONFIG); + validateConfig(0, new GridClientConfiguration(props)); + + // Validate loaded configuration with changed key prefixes. + Properties props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put("new." + e.getKey(), e.getValue()); + + validateConfig(0, new GridClientConfiguration("new.gg.client", props2)); + validateConfig(0, new GridClientConfiguration("new.gg.client.", props2)); + + // Validate loaded test configuration. + File tmp = uncommentProperties(GRID_CLIENT_CONFIG); + + props = loadProperties(25, tmp.toURI().toURL()); + validateConfig(2, new GridClientConfiguration(props)); + + // Validate loaded test configuration with changed key prefixes. + props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put("new." + e.getKey(), e.getValue()); + + validateConfig(2, new GridClientConfiguration("new.gg.client", props2)); + validateConfig(2, new GridClientConfiguration("new.gg.client.", props2)); + + // Validate loaded test configuration with empty key prefixes. + props2 = new Properties(); + + for (Map.Entry<Object, Object> e : props.entrySet()) + props2.put(e.getKey().toString().replace("gg.client.", ""), e.getValue()); + + validateConfig(2, new GridClientConfiguration("", props2)); + validateConfig(2, new GridClientConfiguration(".", props2)); + } + + /** + * Validate spring client configuration. + * + * @throws Exception In case of any exception. + */ + public void testSpringConfig() throws Exception { + GridClientConfiguration cfg = new FileSystemXmlApplicationContext( + GRID_CLIENT_SPRING_CONFIG.toString()).getBean(GridClientConfiguration.class); + + assertEquals(Arrays.asList("127.0.0.1:11211"), new ArrayList<>(cfg.getServers())); + assertNull(cfg.getSecurityCredentialsProvider()); + + Collection<GridClientDataConfiguration> dataCfgs = cfg.getDataConfigurations(); + + assertEquals(1, dataCfgs.size()); + + GridClientDataConfiguration dataCfg = dataCfgs.iterator().next(); + + assertEquals("partitioned", dataCfg.getName()); + + assertNotNull(dataCfg.getPinnedBalancer()); + assertEquals(GridClientRandomBalancer.class, dataCfg.getPinnedBalancer().getClass()); + + assertNotNull(dataCfg.getAffinity()); + assertEquals(GridClientPartitionAffinity.class, dataCfg.getAffinity().getClass()); + } + + /** + * Uncomment properties. + * + * @param url Source to uncomment client properties for. + * @return Temporary file with uncommented client properties. + * @throws IOException In case of IO exception. + */ + private File uncommentProperties(URL url) throws IOException { + InputStream in = url.openStream(); + + assertNotNull(in); + + LineIterator it = IOUtils.lineIterator(in, "UTF-8"); + Collection<String> lines = new ArrayList<>(); + + while (it.hasNext()) + lines.add(it.nextLine().replace("#gg.client.", "gg.client.")); + + IgniteUtils.closeQuiet(in); + + File tmp = File.createTempFile(UUID.randomUUID().toString(), "properties"); + + tmp.deleteOnExit(); + + FileUtils.writeLines(tmp, lines); + + return tmp; + } + + /** + * Load properties from the url. + * + * @param expLoaded Expected number of loaded properties. + * @param url URL to load properties from. + * @return Loaded properties. + * @throws IOException In case of IO exception. + */ + private Properties loadProperties(int expLoaded, URL url) throws IOException { + InputStream in = url.openStream(); + + Properties props = new Properties(); + + assertEquals(0, props.size()); + + props.load(in); + + assertEquals(expLoaded, props.size()); + + IgniteUtils.closeQuiet(in); + + return props; + } + + /** + * Validate loaded configuration. + * + * @param expDataCfgs Expected data configurations count. + * @param cfg Client configuration to validate. + */ + private void validateConfig(int expDataCfgs, GridClientConfiguration cfg) { + assertEquals(GridClientRandomBalancer.class, cfg.getBalancer().getClass()); + assertEquals(10000, cfg.getConnectTimeout()); + assertEquals(null, cfg.getSecurityCredentialsProvider()); + + assertEquals(expDataCfgs, cfg.getDataConfigurations().size()); + + if (expDataCfgs == 2) { + GridClientDataConfiguration nullCfg = cfg.getDataConfiguration(null); + + assertEquals(null, nullCfg.getName()); + assertEquals(null, nullCfg.getAffinity()); + assertEquals(GridClientRandomBalancer.class, nullCfg.getPinnedBalancer().getClass()); + + GridClientDataConfiguration partCfg = cfg.getDataConfiguration("partitioned"); + + assertEquals("partitioned", partCfg.getName()); + assertEquals(GridClientPartitionAffinity.class, partCfg.getAffinity().getClass()); + assertEquals(GridClientRoundRobinBalancer.class, partCfg.getPinnedBalancer().getClass()); + } + + assertEquals(DFLT_MAX_CONN_IDLE_TIME, cfg.getMaxConnectionIdleTime()); + assertEquals(GridClientProtocol.TCP, cfg.getProtocol()); + assertEquals(Arrays.asList("localhost:11211"), new ArrayList<>(cfg.getServers())); + assertEquals(true, cfg.isEnableAttributesCache()); + assertEquals(true, cfg.isEnableMetricsCache()); + assertEquals(true, cfg.isTcpNoDelay()); + assertEquals(null, cfg.getSslContextFactory(), null); + assertEquals(DFLT_TOP_REFRESH_FREQ, cfg.getTopologyRefreshFrequency()); + } +}