http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index a8b2661,0000000..0eefb0f mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@@ -1,850 -1,0 +1,828 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.integration; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; ++import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests basic client behavior with multiple nodes. + */ +@SuppressWarnings("ThrowableResultOfMethodCallIgnored") +public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Partitioned cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Replicated cache name. */ + private static final String REPLICATED_CACHE_NAME = "replicated"; + + /** Replicated async cache name. */ + private static final String REPLICATED_ASYNC_CACHE_NAME = "replicated_async"; + + /** Nodes count. */ + public static final int NODES_CNT = 5; + + /** + * Topology update frequency. + * Set it longer than router's, so we wouldn't receive failures + * caused only by obsolete topology on router. + */ + static final int TOP_REFRESH_FREQ = 2500; + + /** Path to jetty config. */ + public static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; + + /** Path to jetty config with SSl enabled. */ + public static final String REST_JETTY_SSL_CFG = "modules/clients/src/test/resources/jetty/rest-jetty-ssl.xml"; + + /** Host. */ + public static final String HOST = "127.0.0.1"; + + /** Base for tcp rest ports. */ + public static final int REST_TCP_PORT_BASE = 12345; + + /** Base for http rest ports, defined in {@link #REST_JETTY_CFG}. */ + public static final int REST_HTTP_PORT_BASE = 11080; + + /** Base for https rest ports, defined in {@link #REST_JETTY_SSL_CFG}. */ + public static final int REST_HTTPS_PORT_BASE = 11443; + + /** */ + private static volatile boolean commSpiEnabled; + + /** Flag to enable REST in node configuration. */ + private boolean restEnabled = true; + + /** Client instance for each test. */ + private GridClient client; + + /** + * @return Client protocol that should be used. + */ + protected abstract GridClientProtocol protocol(); + + /** + * @return Server address to create first connection. + */ + protected abstract String serverAddress(); + + /** + * @return SSL context factory to use if SSL or {@code null} to disable SSL usage. + */ + @Nullable protected GridSslContextFactory sslContextFactory() { + return null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setLocalHost(HOST); + + assert c.getConnectorConfiguration() == null; + + if (restEnabled) { + ConnectorConfiguration clientCfg = new ConnectorConfiguration(); + + clientCfg.setPort(REST_TCP_PORT_BASE); + + GridSslContextFactory sslCtxFactory = sslContextFactory(); + + if (sslCtxFactory != null) { + clientCfg.setSslEnabled(true); + clientCfg.setSslContextFactory(sslCtxFactory); + } + + c.setConnectorConfiguration(clientCfg); + } + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + + c.setCommunicationSpi(spi); + + c.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(PARTITIONED_CACHE_NAME), + cacheConfiguration(REPLICATED_CACHE_NAME), cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME)); + - ThreadPoolExecutor exec = new ThreadPoolExecutor( - 40, - 40, - 0, - MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); ++ c.setPublicThreadPoolSize(40); + - exec.prestartAllCoreThreads(); - - c.setExecutorService(exec); - - c.setExecutorServiceShutdown(true); - - ThreadPoolExecutor sysExec = new ThreadPoolExecutor( - 40, - 40, - 0, - MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); - - sysExec.prestartAllCoreThreads(); - - c.setSystemExecutorService(sysExec); - - c.setSystemExecutorServiceShutdown(true); ++ c.setSystemThreadPoolSize(40); + + return c; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setDistributionMode(NEAR_PARTITIONED); + + if (cacheName == null) + cfg.setCacheMode(LOCAL); + else if (PARTITIONED_CACHE_NAME.equals(cacheName)) { + cfg.setCacheMode(PARTITIONED); + + cfg.setBackups(0); + } + else + cfg.setCacheMode(REPLICATED); + + cfg.setName(cacheName); + + cfg.setWriteSynchronizationMode(REPLICATED_ASYNC_CACHE_NAME.equals(cacheName) ? FULL_ASYNC : FULL_SYNC); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + info("Stopping grids."); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = GridClientFactory.start(clientConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (client != null) { + GridClientFactory.stop(client.id(), false); + + client = null; + } + } + + /** + * @throws Exception If failed. + */ + public void testSyncCommitRollbackFlags() throws Exception { + commSpiEnabled = true; + + try { + GridClientData data = client.data(REPLICATED_ASYNC_CACHE_NAME); + + info("Before put x1"); + + data.put("x1", "y1"); + + info("Before put x2"); + + data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x2", "y2"); + + info("Before put x3"); + + data.put("x3", "y3"); + + info("Before put x4"); + + data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x4", "y4"); + } + finally { + commSpiEnabled = false; + } + } + + /** + * @throws Exception If failed. + */ + public void testEmptyProjections() throws Exception { + final GridClientCompute dflt = client.compute(); + + Collection<? extends GridClientNode> nodes = dflt.nodes(); + + assertEquals(NODES_CNT, nodes.size()); + + Iterator<? extends GridClientNode> iter = nodes.iterator(); + + final GridClientCompute singleNodePrj = dflt.projection(Collections.singletonList(iter.next())); + + final GridClientNode second = iter.next(); + + final GridClientPredicate<GridClientNode> noneFilter = new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode node) { + return false; + } + }; + + final GridClientPredicate<GridClientNode> targetFilter = new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode node) { + return node.nodeId().equals(second.nodeId()); + } + }; + + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + return singleNodePrj.projection(second); + } + }, GridClientException.class, null); + + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override + public Object call() throws Exception { + return singleNodePrj.projection(targetFilter); + } + }, GridClientException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionRun() throws Exception { + GridClientCompute dflt = client.compute(); + + Collection<? extends GridClientNode> nodes = dflt.nodes(); + + assertEquals(NODES_CNT, nodes.size()); + + for (int i = 0; i < NODES_CNT; i++) { + Ignite g = grid(i); + + assert g != null; + + GridClientNode clientNode = dflt.node(g.cluster().localNode().id()); + + assertNotNull("Client node for " + g.cluster().localNode().id() + " was not found", clientNode); + + GridClientCompute prj = dflt.projection(clientNode); + + String res = prj.execute(TestTask.class.getName(), null); + + assertNotNull(res); + + assertEquals(g.cluster().localNode().id().toString(), res); + } + } + + /** + * @throws Exception If failed. + */ + public void testAffinityExecute() throws Exception { + GridClientCompute dflt = client.compute(); + + GridClientData data = client.data(PARTITIONED_CACHE_NAME); + + Collection<? extends GridClientNode> nodes = dflt.nodes(); + + assertEquals(NODES_CNT, nodes.size()); + + for (int i = 0; i < NODES_CNT; i++) { + Ignite g = grid(i); + + assert g != null; + + int affinityKey = -1; + + for (int key = 0; key < 10000; key++) { + if (g.cluster().localNode().id().equals(data.affinity(key))) { + affinityKey = key; + + break; + } + } + + if (affinityKey == -1) + throw new Exception("Unable to found key for which node is primary: " + g.cluster().localNode().id()); + + GridClientNode clientNode = dflt.node(g.cluster().localNode().id()); + + assertNotNull("Client node for " + g.cluster().localNode().id() + " was not found", clientNode); + + String res = dflt.affinityExecute(TestTask.class.getName(), PARTITIONED_CACHE_NAME, affinityKey, null); + + assertNotNull(res); + + assertEquals(g.cluster().localNode().id().toString(), res); + } + } + + /** + * @throws Exception If failed. + */ + public void testInvalidateFlag() throws Exception { + IgniteEx g0 = grid(0); + + GridCache<String, String> cache = g0.cache(PARTITIONED_CACHE_NAME); + + String key = null; + + for (int i = 0; i < 10_000; i++) { + if (!cache.affinity().isPrimaryOrBackup(g0.localNode(), String.valueOf(i))) { + key = String.valueOf(i); + + break; + } + } + + assertNotNull(key); + + cache.put(key, key); // Create entry in near cache, it is invalidated if INVALIDATE flag is set. + + assertNotNull(cache.peek(key)); + + GridClientData d = client.data(PARTITIONED_CACHE_NAME); + + d.flagsOn(GridClientCacheFlag.INVALIDATE).put(key, "zzz"); + + for (Ignite g : G.allGrids()) { + cache = g.cache(PARTITIONED_CACHE_NAME); + + if (cache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key)) + assertEquals("zzz", cache.peek(key)); + else + assertNull(cache.peek(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientAffinity() throws Exception { + GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME); + + Collection<Object> keys = new ArrayList<>(); + + keys.addAll(Arrays.asList( + Boolean.TRUE, + Boolean.FALSE, + 1, + Integer.MAX_VALUE + )); + + Random rnd = new Random(); + StringBuilder sb = new StringBuilder(); + + // Generate some random strings. + for (int i = 0; i < 100; i++) { + sb.setLength(0); + + for (int j = 0; j < 255; j++) + // Only printable ASCII symbols for test. + sb.append((char)(rnd.nextInt(0x7f - 0x20) + 0x20)); + + keys.add(sb.toString()); + } + + // Generate some more keys to achieve better coverage. + for (int i = 0; i < 100; i++) + keys.add(UUID.randomUUID()); + + for (Object key : keys) { + UUID nodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + + UUID clientNodeId = partitioned.affinity(key); + + assertEquals("Invalid affinity mapping for REST response for key: " + key, nodeId, clientNodeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testTopologyListener() throws Exception { + final Collection<UUID> added = new ArrayList<>(1); + final Collection<UUID> rmvd = new ArrayList<>(1); + + final CountDownLatch addedLatch = new CountDownLatch(1); + final CountDownLatch rmvLatch = new CountDownLatch(1); + + assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); + + GridClientTopologyListener lsnr = new GridClientTopologyListener() { + @Override public void onNodeAdded(GridClientNode node) { + added.add(node.nodeId()); + + addedLatch.countDown(); + } + + @Override public void onNodeRemoved(GridClientNode node) { + rmvd.add(node.nodeId()); + + rmvLatch.countDown(); + } + }; + + client.addTopologyListener(lsnr); + + try { + Ignite g = startGrid(NODES_CNT + 1); + + UUID id = g.cluster().localNode().id(); + + assertTrue(addedLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS)); + + assertEquals(1, added.size()); + assertEquals(id, F.first(added)); + + stopGrid(NODES_CNT + 1); + + assertTrue(rmvLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS)); + + assertEquals(1, rmvd.size()); + assertEquals(id, F.first(rmvd)); + } + finally { + client.removeTopologyListener(lsnr); + + stopGrid(NODES_CNT + 1); + } + } + + /** + * @throws Exception If failed. + */ + public void testDisabledRest() throws Exception { + restEnabled = false; + + final Ignite g = startGrid("disabled-rest"); + + try { + Thread.sleep(2 * TOP_REFRESH_FREQ); + + // As long as we have round robin load balancer this will cause every node to be queried. + for (int i = 0; i < NODES_CNT + 1; i++) + assertEquals(NODES_CNT + 1, client.compute().refreshTopology(false, false).size()); + + final GridClientData data = client.data(PARTITIONED_CACHE_NAME); + + // Check rest-disabled node is unavailable. + try { + String affKey; + + do { + affKey = UUID.randomUUID().toString(); + } while (!data.affinity(affKey).equals(g.cluster().localNode().id())); + + data.put(affKey, "asdf"); + + assertEquals("asdf", cache(0, PARTITIONED_CACHE_NAME).get(affKey)); + } + catch (GridServerUnreachableException e) { + // Thrown for direct client-node connections. + assertTrue("Unexpected exception message: " + e.getMessage(), + e.getMessage().startsWith("No available endpoints to connect (is rest enabled for this node?)")); + } + catch (GridClientException e) { + // Thrown for routed client-router-node connections. + String msg = e.getMessage(); + + assertTrue("Unexpected exception message: " + msg, protocol() == GridClientProtocol.TCP ? + msg.contains("No available endpoints to connect (is rest enabled for this node?)") : // TCP router. + msg.startsWith("No available nodes on the router for destination node ID")); // HTTP router. + } + + // Check rest-enabled nodes are available. + String affKey; + + do { + affKey = UUID.randomUUID().toString(); + } while (data.affinity(affKey).equals(g.cluster().localNode().id())); + + data.put(affKey, "fdsa"); + + assertEquals("fdsa", cache(0, PARTITIONED_CACHE_NAME).get(affKey)); + } + finally { + restEnabled = true; + + G.stop(g.name(), true); + } + } + + /** + * @throws Exception If failed. + */ + public void testAffinityPut() throws Exception { + Thread.sleep(2 * TOP_REFRESH_FREQ); + + assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); + + Map<UUID, Ignite> gridsByLocNode = new HashMap<>(NODES_CNT); + + GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME); + + GridClientCompute compute = client.compute(); + + for (int i = 0; i < NODES_CNT; i++) + gridsByLocNode.put(grid(i).localNode().id(), grid(i)); + + for (int i = 0; i < 100; i++) { + String key = "key" + i; + + UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + + assertEquals("Affinity mismatch for key: " + key, primaryNodeId, partitioned.affinity(key)); + + assertEquals(primaryNodeId, partitioned.affinity(key)); + + // Must go to primary node only. Since backup count is 0, value must present on + // primary node only. + partitioned.put(key, "val" + key); + + for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) { + Object val = entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(key); + + if (primaryNodeId.equals(entry.getKey())) + assertEquals("val" + key, val); + else + assertNull(val); + } + } + + // Now check that we will see value in near cache in pinned mode. + for (int i = 100; i < 200; i++) { + String pinnedKey = "key" + i; + + UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, pinnedKey).id(); + + UUID pinnedNodeId = F.first(F.view(gridsByLocNode.keySet(), F.notEqualTo(primaryNodeId))); + + GridClientNode node = compute.node(pinnedNodeId); + + partitioned.pinNodes(node).put(pinnedKey, "val" + pinnedKey); + + for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) { + Object val = entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(pinnedKey); + + if (primaryNodeId.equals(entry.getKey()) || pinnedNodeId.equals(entry.getKey())) + assertEquals("val" + pinnedKey, val); + else + assertNull(val); + } + } + } + + /** + * @return Client configuration for the test. + */ + protected GridClientConfiguration clientConfiguration() throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setBalancer(getBalancer()); + + cfg.setTopologyRefreshFrequency(TOP_REFRESH_FREQ); + + cfg.setProtocol(protocol()); + cfg.setServers(Arrays.asList(serverAddress())); + cfg.setSslContextFactory(sslContextFactory()); + + GridClientDataConfiguration loc = new GridClientDataConfiguration(); + + GridClientDataConfiguration partitioned = new GridClientDataConfiguration(); + + partitioned.setName(PARTITIONED_CACHE_NAME); + partitioned.setAffinity(new GridClientPartitionAffinity()); + + GridClientDataConfiguration replicated = new GridClientDataConfiguration(); + replicated.setName(REPLICATED_CACHE_NAME); + + GridClientDataConfiguration replicatedAsync = new GridClientDataConfiguration(); + replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME); + + cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, replicatedAsync)); + + return cfg; + } + + /** + * Gets client load balancer. + * + * @return Load balancer. + */ + protected GridClientLoadBalancer getBalancer() { + return new GridClientRoundRobinBalancer(); + } + + /** + * Test task. Returns a tuple in which first component is id of node that has split the task, + * and second component is count of nodes that executed jobs. + */ + private static class TestTask extends ComputeTaskSplitAdapter<Object, String> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** Count of tasks this job was split to. */ + private int gridSize; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize); + + this.gridSize = gridSize; + + final String locNodeId = ignite.cluster().localNode().id().toString(); + + for (int i = 0; i < gridSize; i++) { + jobs.add(new ComputeJobAdapter() { + @SuppressWarnings("OverlyStrongTypeCast") + @Override public Object execute() { + try { + Thread.sleep(1000); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return new IgniteBiTuple<>(locNodeId, 1); + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public String reduce(List<ComputeJobResult> results) { + int sum = 0; + + String locNodeId = null; + + for (ComputeJobResult res : results) { + IgniteBiTuple<String, Integer> part = res.getData(); + + if (locNodeId == null) + locNodeId = part.get1(); + + Integer i = part.get2(); + + if (i != null) + sum += i; + } + + assert gridSize == sum; + + return locNodeId; + } + } + + /** + * Communication SPI which checks cache flags. + */ + @SuppressWarnings("unchecked") + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) ++ @Override public void sendMessage(ClusterNode node, MessageAdapter msg) + throws IgniteSpiException { + checkSyncFlags((GridIoMessage)msg); + + super.sendMessage(node, msg); + } + + /** + * Check if flags in correct state. + * + * @param msg Message. + */ + private void checkSyncFlags(GridIoMessage msg) { + if (!commSpiEnabled) + return; + + Object o = msg.message(); + + if (!(o instanceof GridDistributedLockRequest)) + return; + + IgniteKernal g = (IgniteKernal)G.ignite(ignite.configuration().getNodeId()); + + GridCacheContext<Object, Object> cacheCtx = g.internalCache(REPLICATED_ASYNC_CACHE_NAME).context(); + + IgniteTxManager<Object, Object> tm = cacheCtx.tm(); + + GridCacheVersion v = ((GridCacheVersionable)o).version(); + + IgniteInternalTx t = tm.tx(v); + + if (t.hasWriteKey(cacheCtx.txKey("x1"))) + assertFalse("Invalid tx flags: " + t, t.syncCommit()); + else if (t.hasWriteKey(cacheCtx.txKey("x2"))) + assertTrue("Invalid tx flags: " + t, t.syncCommit()); + else if (t.hasWriteKey(cacheCtx.txKey("x3"))) + assertFalse("Invalid tx flags: " + t, t.syncCommit()); + else if (t.hasWriteKey(cacheCtx.txKey("x4"))) + assertTrue("Invalid tx flags: " + t, t.syncCommit()); + } + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedCommand() throws Exception { + final GridClientData data = client.data(PARTITIONED_CACHE_NAME); + final GridClientCompute compute = client.compute(); + final AtomicInteger cnt = new AtomicInteger(0); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 20; i++) { + String key = UUID.randomUUID().toString(); + String val = UUID.randomUUID().toString(); + + switch (cnt.incrementAndGet() % 4) { + case 0: { + assertTrue(data.put(key, val)); + assertEquals(val, data.get(key)); + assertTrue(data.remove(key)); + + break; + } + + case 1: { + assertNotNull(data.metrics()); + + break; + } + + case 2: { + String nodeId = compute.execute(TestTask.class.getName(), null); + + assertNotNull(nodeId); + assertNotNull(compute.refreshNode(UUID.fromString(nodeId), true, true)); + + break; + } + + case 3: { + assertEquals(NODES_CNT, compute.refreshTopology(true, true).size()); + + break; + } + } + } + + return null; + } + }, 50, "multithreaded-client-access"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java index 20bb62a,0000000..383f76a mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java @@@ -1,1480 -1,0 +1,1599 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.integration; + +import junit.framework.*; +import net.sf.json.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.testframework.GridTestUtils.*; + +/** + * Tests for Java client. + */ +@SuppressWarnings("deprecation") +public abstract class ClientAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + public static final String HOST = "127.0.0.1"; + + /** */ + public static final int JETTY_PORT = 8080; + + /** */ + public static final int BINARY_PORT = 11212; + + /** Path to jetty config. */ + public static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; + + /** Need to be static because configuration inits only once per class. */ + private static final ConcurrentMap<Object, Object> INTERCEPTED_OBJECTS = new ConcurrentHashMap<>(); + + /** */ + private static final Map<String, HashMapStore> cacheStores = new HashMap<>(); + + /** Path to test log. */ + private static final String TEST_LOG_PATH = "modules/core/src/test/resources/log/ignite.log.tst"; + + /** */ + public static final String ROUTER_LOG_CFG = "modules/core/src/test/config/log4j-test.xml"; + + /** */ + private static final String INTERCEPTED_SUF = "intercepted"; + + /** */ + private static final String[] TASK_ARGS = new String[] {"executing", "test", "task"}; + + /** Flag indicating whether intercepted objects should be overwritten. */ + private static volatile boolean overwriteIntercepted; + + /** */ + private ExecutorService exec; + + /** */ + protected GridClient client; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IGNITE_JETTY_PORT, Integer.toString(JETTY_PORT)); + + startGrid(); + + System.clearProperty(IGNITE_JETTY_PORT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + exec = Executors.newCachedThreadPool(); + + client = client(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + exec.shutdown(); + exec = null; + + GridClientFactory.stop(client.id(), true); + + client = null; + + for (HashMapStore cacheStore : cacheStores.values()) + cacheStore.map.clear(); + - grid().cache(null).clearAll(); - grid().cache(CACHE_NAME).clearAll(); ++ grid().cache(null).clear(); ++ grid().cache(CACHE_NAME).clear(); + + INTERCEPTED_OBJECTS.clear(); + } + + /** + * Gets protocol which should be used in client connection. + * + * @return Protocol. + */ + protected abstract GridClientProtocol protocol(); + + /** + * Gets server address to which client should connect. + * + * @return Server address in format "host:port". + */ + protected abstract String serverAddress(); + + /** + * @return Whether SSL should be used in test. + */ + protected abstract boolean useSsl(); + + /** + * @return SSL context factory used in test. + */ + protected abstract GridSslContextFactory sslContextFactory(); + + /** + * Get task name. + * + * @return Task name. + */ + protected String getTaskName() { + return TestTask.class.getName(); + } + + /** + * @return name of the sleep task for current test. + */ + protected String getSleepTaskName() { + return SleepTestTask.class.getName(); + } + + /** + * Get task argument. + * + * @return Task argument. + */ + protected Object getTaskArgument() { + return Arrays.asList(TASK_ARGS); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getConnectorConfiguration() == null; + + ConnectorConfiguration clientCfg = new ConnectorConfiguration(); + + clientCfg.setPort(BINARY_PORT); + ++ clientCfg.setRestAccessibleFolders( ++ U.getGridGainHome() + "/work/log", ++ U.resolveGridGainPath("modules/core/src/test/resources/log").getAbsolutePath()); ++ + if (useSsl()) { + clientCfg.setSslEnabled(true); + + clientCfg.setSslContextFactory(sslContextFactory()); + } + + cfg.setConnectorConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration("replicated"), + cacheConfiguration("partitioned"), cacheConfiguration(CACHE_NAME)); + + clientCfg.setMessageInterceptor(new ConnectorMessageInterceptor() { + @Override + public Object onReceive(@Nullable Object obj) { + if (obj != null) + INTERCEPTED_OBJECTS.put(obj, obj); + + return overwriteIntercepted && obj instanceof String ? + obj + INTERCEPTED_SUF : obj; + } + + @Override + public Object onSend(Object obj) { + if (obj != null) + INTERCEPTED_OBJECTS.put(obj, obj); + + return obj; + } + }); + + // Specify swap SPI, otherwise test fails on windows. + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(cacheName == null || CACHE_NAME.equals(cacheName) ? LOCAL : "replicated".equals(cacheName) ? + REPLICATED : PARTITIONED); + cfg.setName(cacheName); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + HashMapStore cacheStore = cacheStores.get(cacheName); + + if (cacheStore == null) + cacheStores.put(cacheName, cacheStore = new HashMapStore()); + + cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(cacheStore)); + cfg.setWriteThrough(true); + cfg.setReadThrough(true); + cfg.setLoadPreviousValue(true); + + cfg.setSwapEnabled(true); + + if (cfg.getCacheMode() == PARTITIONED) + cfg.setBackups(1); + + return cfg; + } + + /** + * @return Client. + * @throws GridClientException In case of error. + */ + protected GridClient client() throws GridClientException { + return GridClientFactory.start(clientConfiguration()); + } + + /** + * @return Test client configuration. + */ + protected GridClientConfiguration clientConfiguration() throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + GridClientDataConfiguration nullCache = new GridClientDataConfiguration(); + + GridClientDataConfiguration cache = new GridClientDataConfiguration(); + + cache.setName(CACHE_NAME); + + cfg.setDataConfigurations(Arrays.asList(nullCache, cache)); + + cfg.setProtocol(protocol()); + cfg.setServers(Arrays.asList(serverAddress())); + + // Setting custom executor, to avoid failures on client shutdown. + // And applying custom naming scheme to ease debugging. + cfg.setExecutorService(Executors.newCachedThreadPool(new ThreadFactory() { + private AtomicInteger cntr = new AtomicInteger(); + + @SuppressWarnings("NullableProblems") + @Override public Thread newThread(Runnable r) { + return new Thread(r, "client-worker-thread-" + cntr.getAndIncrement()); + } + })); + + if (useSsl()) + cfg.setSslContextFactory(sslContextFactory()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConnectable() throws Exception { + GridClient client = client(); + + List<GridClientNode> nodes = client.compute().refreshTopology(false, false); + + assertTrue(F.first(nodes).connectable()); + } + + /** + * Check async API methods don't generate exceptions. + * + * @throws Exception If failed. + */ + public void testNoAsyncExceptions() throws Exception { + GridClient client = client(); + + GridClientData data = client.data(CACHE_NAME); + GridClientCompute compute = client.compute().projection(new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return false; + } + }); + + Map<String, GridClientFuture<?>> futs = new LinkedHashMap<>(); + + futs.put("exec", compute.executeAsync("taskName", "taskArg")); + futs.put("affExec", compute.affinityExecuteAsync("taskName", "cacheName", "affKey", "taskArg")); + futs.put("refreshById", compute.refreshNodeAsync(UUID.randomUUID(), true, true)); + futs.put("refreshByIP", compute.refreshNodeAsync("nodeIP", true, true)); + futs.put("refreshTop", compute.refreshTopologyAsync(true, true)); + + GridClientFactory.stop(client.id(), false); + + futs.put("put", data.putAsync("key", "val")); + futs.put("putAll", data.putAllAsync(F.asMap("key", "val"))); + futs.put("get", data.getAsync("key")); + futs.put("getAll", data.getAllAsync(Arrays.asList("key"))); + futs.put("remove", data.removeAsync("key")); + futs.put("removeAll", data.removeAllAsync(Arrays.asList("key"))); + futs.put("replace", data.replaceAsync("key", "val")); + futs.put("cas", data.casAsync("key", "val", "val2")); + futs.put("metrics", data.metricsAsync()); + + for (Map.Entry<String, GridClientFuture<?>> e : futs.entrySet()) { + try { + e.getValue().get(); + + info("Expects '" + e.getKey() + "' fails with grid client exception."); + } + catch (GridServerUnreachableException |GridClientClosedException ignore) { + // No op: compute projection is empty. + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGracefulShutdown() throws Exception { + GridClientCompute compute = client.compute(); + + Object taskArg = getTaskArgument(); + String taskName = getSleepTaskName(); + + GridClientFuture<Object> fut = compute.executeAsync(taskName, taskArg); + GridClientFuture<Object> fut2 = compute.executeAsync(taskName, taskArg); + + GridClientFactory.stop(client.id(), true); + + Assert.assertEquals(17, fut.get()); + Assert.assertEquals(17, fut2.get()); + } + + /** + * @throws Exception If failed. + */ + public void testForceShutdown() throws Exception { + GridClientCompute compute = client.compute(); + + Object taskArg = getTaskArgument(); + String taskName = getSleepTaskName(); + + GridClientFuture<Object> fut = compute.executeAsync(taskName, taskArg); + + GridClientFactory.stop(client.id(), false); + + try { + fut.get(); + } + catch (GridClientClosedException ignored) { + return; + } + + Assert.fail("Expected GridClientClosedException."); + } + + /** + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + GridClient c = client(); + + GridClientCompute compute = c.compute(); + + String taskName = getTaskName(); + Object taskArg = getTaskArgument(); + + Collection<GridClientFuture<Object>> futs = new ArrayList<>(); + + // Validate connection works. + compute.execute(taskName, taskArg); + + info(">>> First task executed successfully, running batch."); + - for (int i = 0; i < 100; i++) ++ for (int i = 0; i < 10; i++) + futs.add(compute.executeAsync(taskName, taskArg)); + + // Stop client. + GridClientFactory.stop(c.id(), true); + + info(">>> Completed stop request."); + + int failed = 0; + + for (GridClientFuture<Object> fut : futs) { + try { + assertEquals(17, fut.get()); + } + catch (GridClientException e) { + failed++; + + log.warning("Task execution failed.", e); + } + } + + assertEquals(0, failed); + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + GridCache<String, String> dfltCache = grid().cache(null); + GridCache<Object, Object> namedCache = grid().cache(CACHE_NAME); + + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertTrue(dfltData.put("key1", "val1")); + assertEquals("val1", dfltCache.get("key1")); + + assertTrue(dfltData.putAsync("key2", "val2").get()); + assertEquals("val2", dfltCache.get("key2")); + + assertTrue(namedData.put("key1", "val1")); + assertEquals("val1", namedCache.get("key1")); + + assertTrue(namedData.putAsync("key2", "val2").get()); + assertEquals("val2", namedCache.get("key2")); + + assertTrue(dfltData.put("", "")); + assertEquals("", dfltCache.get("")); + + assertTrue(namedData.put("", "")); + assertEquals("", namedCache.get("")); + } + + /** + * @throws Exception If failed. + */ + public void testCacheFlags() throws Exception { + /* Note! Only 'SKIP_STORE' flag is validated. */ + final GridClientData data = client.data(CACHE_NAME); + final GridClientData readData = data.flagsOn(GridClientCacheFlag.SKIP_STORE); + final GridClientData writeData = readData.flagsOff(GridClientCacheFlag.SKIP_STORE); + + assertEquals(Collections.singleton(GridClientCacheFlag.SKIP_STORE), readData.flags()); + assertTrue(writeData.flags().isEmpty()); + + for (int i = 0; i < 100; i++) { + String key = UUID.randomUUID().toString(); + Object val = UUID.randomUUID().toString(); + + // Put entry into cache & store. + assertTrue(writeData.put(key, val)); + + assertEquals(val, readData.get(key)); + assertEquals(val, writeData.get(key)); + + // Remove from cache, skip store. + assertTrue(readData.remove(key)); + + assertNull(readData.get(key)); + assertEquals(val, writeData.get(key)); + assertEquals(val, readData.get(key)); + + // Remove from cache and from store. + assertTrue(writeData.remove(key)); + + assertNull(readData.get(key)); + assertNull(writeData.get(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + dfltData.putAll(F.asMap("key1", "val1", "key2", "val2")); + + Map<String, String> map = grid().<String, String>cache(null).getAll(F.asList("key1", "key2")); + + assertEquals(2, map.size()); + assertEquals("val1", map.get("key1")); + assertEquals("val2", map.get("key2")); + + dfltData.putAllAsync(F.asMap("key3", "val3", "key4", "val4")).get(); + + map = grid().<String, String>cache(null).getAll(F.asList("key3", "key4")); + + assertEquals(2, map.size()); + assertEquals("val3", map.get("key3")); + assertEquals("val4", map.get("key4")); + + namedData.putAll(F.asMap("key1", "val1", "key2", "val2")); + + map = grid().<String, String>cache(CACHE_NAME).getAll(F.asList("key1", "key2")); + + assertEquals(2, map.size()); + assertEquals("val1", map.get("key1")); + assertEquals("val2", map.get("key2")); + + namedData.putAllAsync(F.asMap("key3", "val3", "key4", "val4")).get(); + + map = grid().<String, String>cache(CACHE_NAME).getAll(F.asList("key3", "key4")); + + assertEquals(2, map.size()); + assertEquals("val3", map.get("key3")); + assertEquals("val4", map.get("key4")); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllWithCornerCases() throws Exception { + final GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + dfltData.putAll(F.asMap("", "val1")); + + assertEquals(F.asMap("", "val1"), grid().<String, String>cache(null).getAll(F.asList(""))); + + GridClientProtocol proto = clientConfiguration().getProtocol(); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + dfltData.putAll(Collections.singletonMap("key3", null)); + + return null; + } + }, proto == GridClientProtocol.TCP ? GridClientException.class : IllegalArgumentException.class, null); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + dfltData.putAll(Collections.singletonMap(null, "val2")); + + return null; + } + }, proto == GridClientProtocol.TCP ? GridClientException.class : IllegalArgumentException.class, null); + + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + dfltData.getAll(Collections.singleton(null)); + + return null; + } + }, proto == GridClientProtocol.TCP ? GridClientException.class : IllegalArgumentException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertTrue(grid().cache(null).putx("key", "val")); + + Assert.assertEquals("val", dfltData.get("key")); + Assert.assertEquals("val", dfltData.getAsync("key").get()); + + assertTrue(grid().cache(CACHE_NAME).putx("key", "val")); + + Assert.assertEquals("val", namedData.get("key")); + Assert.assertEquals("val", namedData.getAsync("key").get()); + } + + /** + * @throws Exception If failed. + */ + public void testGetAll() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(grid().cache(null).putx("key2", "val2")); + + Map<String, String> map = dfltData.getAll(F.asList("key1", "key2")); + + assertEquals(2, map.size()); + assertEquals("val1", map.get("key1")); + assertEquals("val2", map.get("key2")); + + assertTrue(grid().cache(null).putx("key3", "val3")); + assertTrue(grid().cache(null).putx("key4", "val4")); + + map = dfltData.getAll(F.asList("key3", "key4")); + + assertEquals(2, map.size()); + assertEquals("val3", map.get("key3")); + assertEquals("val4", map.get("key4")); + + map = dfltData.getAll(F.asList("key1")); + + assertEquals(1, map.size()); + assertEquals("val1", map.get("key1")); + + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(grid().cache(CACHE_NAME).putx("key2", "val2")); + + map = namedData.getAll(F.asList("key1", "key2")); + + assertEquals(2, map.size()); + assertEquals("val1", map.get("key1")); + assertEquals("val2", map.get("key2")); + + assertTrue(grid().cache(CACHE_NAME).putx("key3", "val3")); + assertTrue(grid().cache(CACHE_NAME).putx("key4", "val4")); + + map = namedData.getAll(F.asList("key3", "key4")); + + assertEquals(2, map.size()); + assertEquals("val3", map.get("key3")); + assertEquals("val4", map.get("key4")); + + map = namedData.getAll(F.asList("key1")); + + assertEquals(1, map.size()); + assertEquals("val1", map.get("key1")); + + } + + /** + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(grid().cache(null).putx("key2", "val2")); + + assertTrue(dfltData.remove("key1")); + assertTrue(dfltData.removeAsync("key2").get()); + assertFalse(dfltData.remove("wrongKey")); + assertFalse(dfltData.removeAsync("wrongKey").get()); + + assertNull(grid().cache(null).get("key1")); + assertNull(grid().cache(null).get("key2")); + + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(grid().cache(CACHE_NAME).putx("key2", "val2")); + + assertTrue(namedData.remove("key1")); + assertTrue(namedData.removeAsync("key2").get()); + assertFalse(namedData.remove("wrongKey")); + assertFalse(namedData.removeAsync("wrongKey").get()); + + assertNull(grid().cache(CACHE_NAME).get("key1")); + assertNull(grid().cache(CACHE_NAME).get("key2")); + } + + /** + * @throws Exception If failed. + */ + public void testSkipStoreFlag() throws Exception { + GridClientData namedData = client.data(CACHE_NAME).flagsOn(GridClientCacheFlag.SKIP_STORE); + + // test keyA + assertTrue(grid().cache(CACHE_NAME).putx("keyA", "valA")); + assertTrue(namedData.remove("keyA")); + assertEquals("valA", cacheStores.get(CACHE_NAME).map.get("keyA")); + assertNull(namedData.get("keyA")); + + // test keyX + assertTrue(namedData.put("keyX", "valX")); + assertEquals("valX", namedData.get("keyX")); + assertNull(cacheStores.get(CACHE_NAME).map.get("keyX")); + } + + /** + * @throws Exception If failed. + */ + public void testSkipSwapFlag() throws Exception { + GridClientData namedData = client.data(CACHE_NAME); + + assertTrue(namedData.put("k", "v")); + + assertTrue(grid().cache(CACHE_NAME).evict("k")); + + assertNull(namedData.flagsOn(GridClientCacheFlag.SKIP_SWAP, GridClientCacheFlag.SKIP_STORE).get("k")); + assertEquals("v", namedData.flagsOn(GridClientCacheFlag.SKIP_STORE).get("k")); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(grid().cache(null).putx("key2", "val2")); + assertTrue(grid().cache(null).putx("key3", "val3")); + assertTrue(grid().cache(null).putx("key4", "val4")); + + dfltData.removeAll(F.asList("key1", "key2")); + dfltData.removeAllAsync(F.asList("key3", "key4")).get(); + + assertNull(grid().cache(null).get("key1")); + assertNull(grid().cache(null).get("key2")); + assertNull(grid().cache(null).get("key3")); + assertNull(grid().cache(null).get("key4")); + + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(grid().cache(CACHE_NAME).putx("key2", "val2")); + assertTrue(grid().cache(CACHE_NAME).putx("key3", "val3")); + assertTrue(grid().cache(CACHE_NAME).putx("key4", "val4")); + + namedData.removeAll(F.asList("key1", "key2")); + namedData.removeAllAsync(F.asList("key3", "key4")).get(); + + assertNull(grid().cache(CACHE_NAME).get("key1")); + assertNull(grid().cache(CACHE_NAME).get("key2")); + assertNull(grid().cache(CACHE_NAME).get("key3")); + assertNull(grid().cache(CACHE_NAME).get("key4")); + } + + /** + * @throws Exception If failed. + */ + public void testReplace() throws Exception { + GridClientData dfltData = client.data(); + + assertNotNull(dfltData); + + GridClientData namedData = client.data(CACHE_NAME); + + assertNotNull(namedData); + + assertFalse(dfltData.replace("key1", "val1")); + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(dfltData.replace("key1", "val2")); + assertEquals("val2", grid().cache(null).get("key1")); + + assertFalse(dfltData.replace("key2", "val1")); + assertTrue(grid().cache(null).putx("key2", "val1")); + assertTrue(dfltData.replace("key2", "val2")); + assertEquals("val2", grid().cache(null).get("key2")); + + grid().cache(null).removeAll(F.asList("key1", "key2")); + + assertFalse(dfltData.replaceAsync("key1", "val1").get()); + assertTrue(grid().cache(null).putx("key1", "val1")); + assertTrue(dfltData.replaceAsync("key1", "val2").get()); + assertEquals("val2", grid().cache(null).get("key1")); + + assertFalse(dfltData.replaceAsync("key2", "val1").get()); + assertTrue(grid().cache(null).putx("key2", "val1")); + assertTrue(dfltData.replaceAsync("key2", "val2").get()); + assertEquals("val2", grid().cache(null).get("key2")); + + assertFalse(namedData.replace("key1", "val1")); + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(namedData.replace("key1", "val2")); + assertEquals("val2", grid().cache(CACHE_NAME).get("key1")); + + assertFalse(namedData.replaceAsync("key2", "val1").get()); + assertTrue(grid().cache(CACHE_NAME).putx("key2", "val1")); + assertTrue(namedData.replaceAsync("key2", "val2").get()); + assertEquals("val2", grid().cache(CACHE_NAME).get("key2")); + + grid().cache(CACHE_NAME).removeAll(F.asList("key1", "key2")); + + assertFalse(namedData.replaceAsync("key1", "val1").get()); + assertTrue(grid().cache(CACHE_NAME).putx("key1", "val1")); + assertTrue(namedData.replaceAsync("key1", "val2").get()); + assertEquals("val2", grid().cache(CACHE_NAME).get("key1")); + + assertFalse(namedData.replaceAsync("key2", "val1").get()); + assertTrue(grid().cache(CACHE_NAME).putx("key2", "val1")); + assertTrue(namedData.replaceAsync("key2", "val2").get()); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("NullableProblems") + public void testCompareAndSet() throws Exception { + GridClientData[] datas = new GridClientData[] { + client.data(), + client.data(CACHE_NAME) + }; + + assertNotNull(datas[0]); + assertNotNull(datas[1]); + + GridCache[] caches = new GridCache[] { + grid().cache(null), + grid().cache(CACHE_NAME) + }; + + for (int i = 0; i < datas.length; i++) { + GridClientData data = datas[i]; + GridCache<String, String> cache = (GridCache<String, String>)caches[i]; + + assertFalse(data.cas("key", null, null)); + assertTrue(cache.putx("key", "val")); + assertTrue(data.cas("key", null, null)); + assertNull(cache.get("key")); + + assertFalse(data.cas("key", null, "val")); + assertTrue(cache.putx("key", "val")); + assertFalse(data.cas("key", null, "wrongVal")); + assertEquals("val", cache.get("key")); + assertTrue(data.cas("key", null, "val")); + assertNull(cache.get("key")); + + assertTrue(data.cas("key", "val", null)); + assertEquals("val", cache.get("key")); + assertFalse(data.cas("key", "newVal", null)); + assertEquals("val", cache.get("key")); + assertTrue(cache.removex("key")); + + assertFalse(data.cas("key", "val1", "val2")); + assertTrue(cache.putx("key", "val2")); + assertFalse(data.cas("key", "val1", "wrongVal")); + assertEquals("val2", cache.get("key")); + assertTrue(data.cas("key", "val1", "val2")); + assertEquals("val1", cache.get("key")); + assertTrue(cache.removex("key")); + + assertFalse(data.casAsync("key", null, null).get()); + assertTrue(cache.putx("key", "val")); + assertTrue(data.casAsync("key", null, null).get()); + assertNull(cache.get("key")); + + assertFalse(data.casAsync("key", null, "val").get()); + assertTrue(cache.putx("key", "val")); + assertFalse(data.casAsync("key", null, "wrongVal").get()); + assertEquals("val", cache.get("key")); + assertTrue(data.casAsync("key", null, "val").get()); + assertNull(cache.get("key")); + + assertTrue(data.casAsync("key", "val", null).get()); + assertEquals("val", cache.get("key")); + assertFalse(data.casAsync("key", "newVal", null).get()); + assertEquals("val", cache.get("key")); + assertTrue(cache.removex("key")); + + assertFalse(data.casAsync("key", "val1", "val2").get()); + assertTrue(cache.putx("key", "val2")); + assertFalse(data.casAsync("key", "val1", "wrongVal").get()); + assertEquals("val2", cache.get("key")); + assertTrue(data.casAsync("key", "val1", "val2").get()); + assertEquals("val1", cache.get("key")); + assertTrue(cache.removex("key")); + } + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + GridClientData dfltData = client.data(); + GridClientData namedData = client.data(CACHE_NAME); + + grid().cache(null).mxBean().clear(); + grid().cache(CACHE_NAME).mxBean().clear(); + + grid().cache(null).putx("key1", "val1"); + grid().cache(null).putx("key2", "val2"); + grid().cache(null).putx("key2", "val3"); + + assertEquals("val1", grid().cache(null).get("key1")); + assertEquals("val3", grid().cache(null).get("key2")); + assertEquals("val3", grid().cache(null).get("key2")); + + grid().cache(CACHE_NAME).putx("key1", "val1"); + grid().cache(CACHE_NAME).putx("key2", "val2"); + grid().cache(CACHE_NAME).putx("key2", "val3"); + + assertEquals("val1", grid().cache(CACHE_NAME).get("key1")); + assertEquals("val3", grid().cache(CACHE_NAME).get("key2")); + assertEquals("val3", grid().cache(CACHE_NAME).get("key2")); + + GridClientDataMetrics m = dfltData.metrics(); + + CacheMetrics metrics = grid().cache(null).metrics(); + + assertNotNull(m); + assertEquals(metrics.getCacheGets(), m.reads()); + assertEquals(metrics.getCachePuts(), m.writes()); + + m = dfltData.metricsAsync().get(); + + assertNotNull(m); + assertEquals(metrics.getCacheGets(), m.reads()); + assertEquals(metrics.getCachePuts(), m.writes()); + + m = namedData.metrics(); + + metrics = grid().cache(CACHE_NAME).metrics(); + + assertNotNull(m); + assertEquals(metrics.getCacheGets(), m.reads()); + assertEquals(metrics.getCachePuts(), m.writes()); + + m = namedData.metricsAsync().get(); + + assertNotNull(m); + assertEquals(metrics.getCacheGets(), m.reads()); + assertEquals(metrics.getCachePuts(), m.writes()); + } + + /** + * @throws Exception If failed. + */ + public void testAppendPrepend() throws Exception { + List<GridClientData> datas = Arrays.asList(client.data(), client.data(CACHE_NAME)); + + String key = UUID.randomUUID().toString(); + + for (GridClientData data : datas) { + assertNotNull(data); + + data.remove(key); + + assertFalse(data.append(key, ".suffix")); + assertTrue(data.put(key, "val")); + assertTrue(data.append(key, ".suffix")); + assertEquals("val.suffix", data.get(key)); + assertTrue(data.remove(key)); + assertFalse(data.append(key, ".suffix")); + + data.remove(key); + + assertFalse(data.prepend(key, "postfix.")); + assertTrue(data.put(key, "val")); + assertTrue(data.prepend(key, "postfix.")); + assertEquals("postfix.val", data.get(key)); + assertTrue(data.remove(key)); + assertFalse(data.prepend(key, "postfix.")); + } + + // TCP protocol supports work with collections. + if (protocol() != GridClientProtocol.TCP) + return; + + List<String> origList = new ArrayList<>(Arrays.asList("1", "2")); // This list should be modifiable. + List<String> newList = Arrays.asList("3", "4"); + + Map<String, String> origMap = F.asMap("1", "a1", "2", "a2"); + Map<String, String> newMap = F.asMap("2", "b2", "3", "b3"); + + for (GridClientData data : datas) { + assertNotNull(data); + + data.remove(key); + + assertFalse(data.append(key, newList)); + assertTrue(data.put(key, origList)); + assertTrue(data.append(key, newList)); + assertEquals(Arrays.asList("1", "2", "3", "4"), data.get(key)); + + data.remove(key); + + assertFalse(data.prepend(key, newList)); + assertTrue(data.put(key, origList)); + assertTrue(data.prepend(key, newList)); + assertEquals(Arrays.asList("3", "4", "1", "2"), data.get(key)); + + data.remove(key); + + assertFalse(data.append(key, newMap)); + assertTrue(data.put(key, origMap)); + assertTrue(data.append(key, newMap)); + assertEquals(F.asMap("1", "a1", "2", "b2", "3", "b3"), data.get(key)); + + data.remove(key); + + assertFalse(data.prepend(key, newMap)); + assertTrue(data.put(key, origMap)); + assertTrue(data.prepend(key, newMap)); + assertEquals(F.asMap("1", "a1", "2", "a2", "3", "b3"), data.get(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testExecute() throws Exception { + String taskName = getTaskName(); + Object taskArg = getTaskArgument(); + + GridClientCompute compute = client.compute(); + + Assert.assertEquals(17, compute.execute(taskName, taskArg)); + Assert.assertEquals(17, compute.executeAsync(taskName, taskArg).get()); + } + + /** + * @throws Exception If failed. + */ + public void testNode() throws Exception { + GridClientCompute compute = client.compute(); + + assertNull(compute.refreshNode(UUID.randomUUID(), true, false)); + assertNull(compute.refreshNode(UUID.randomUUID(), false, false)); + + GridClientNode node = compute.refreshNode(grid().localNode().id(), true, false); + + assertNotNull(node); + assertFalse(node.attributes().isEmpty()); + assertTrue(node.metrics() == null); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + Integer replica = grid().localNode().attribute(CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT_ATTR_NAME); + + if (replica == null) + replica = CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT; + + assertEquals((int)replica, node.replicaCount()); + + Map<String, GridClientCacheMode> caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + + node = compute.refreshNode(grid().localNode().id(), false, false); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertTrue(node.metrics() == null); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + + node = compute.refreshNode(grid().localNode().id(), false, true); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertFalse(node.metrics() == null); + assertTrue(node.metrics().getCurrentActiveJobs() != -1); + assertTrue(node.metrics().getCurrentIdleTime() != -1); + assertTrue(node.metrics().getLastUpdateTime() != -1); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + + assertNull(compute.refreshNodeAsync(UUID.randomUUID(), true, false).get()); + assertNull(compute.refreshNodeAsync(UUID.randomUUID(), false, false).get()); + + node = compute.refreshNodeAsync(grid().localNode().id(), true, false).get(); + + assertNotNull(node); + assertFalse(node.attributes().isEmpty()); + assertTrue(node.metrics() == null); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + + node = compute.refreshNodeAsync(grid().localNode().id(), false, false).get(); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertTrue(node.metrics() == null); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + + node = compute.refreshNodeAsync(grid().localNode().id(), false, true).get(); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertFalse(node.metrics() == null); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertEquals(4, node.caches().size()); + + caches = node.caches(); + + for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { + if (e.getKey() == null || CACHE_NAME.equals(e.getKey())) + assertEquals(GridClientCacheMode.LOCAL, e.getValue()); + else if ("replicated".equals(e.getKey())) + assertEquals(GridClientCacheMode.REPLICATED, e.getValue()); + else if ("partitioned".equals(e.getKey())) + assertEquals(GridClientCacheMode.PARTITIONED, e.getValue()); + else + fail("Unexpected cache name: " + e.getKey()); + } + } + + /** + * @throws Exception If failed. + */ + public void testTopology() throws Exception { + GridClientCompute compute = client.compute(); + + List<GridClientNode> top = compute.refreshTopology(true, true); + + assertNotNull(top); + assertEquals(1, top.size()); + + GridClientNode node = F.first(top); + + assertNotNull(node); + assertFalse(node.attributes().isEmpty()); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + assertNotNull(node.metrics()); + + top = compute.refreshTopology(false, false); + + node = F.first(top); + + assertNotNull(top); + assertEquals(1, top.size()); + assertNull(node.metrics()); + assertTrue(node.attributes().isEmpty()); + + node = F.first(top); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertNull(node.metrics()); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + + top = compute.refreshTopologyAsync(true, true).get(); + + assertNotNull(top); + assertEquals(1, top.size()); + + node = F.first(top); + + assertNotNull(node); + assertFalse(node.attributes().isEmpty()); + assertNotNull(node.metrics()); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + + top = compute.refreshTopologyAsync(false, false).get(); + + assertNotNull(top); + assertEquals(1, top.size()); + + node = F.first(top); + + assertNotNull(node); + assertTrue(node.attributes().isEmpty()); + assertNull(node.metrics()); + assertNotNull(node.tcpAddresses()); + assertEquals(grid().localNode().id(), node.nodeId()); + } + + /** ++ * @throws Exception If failed. ++ */ ++ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") ++ public void testLog() throws Exception { ++ final GridClientCompute compute = client.compute(); ++ ++ /* Usually this log file is created by log4j, but some times it doesn't exists. */ ++ new File(U.getGridGainHome(), "work/log/gridgain.log").createNewFile(); ++ ++ List<String> log = compute.log(6, 7); ++ assertNotNull(log); ++ ++ log = compute.log(-7, -6); ++ assertNotNull(log); ++ ++ log = compute.log(-6, -7); ++ assertNotNull(log); ++ assertTrue(log.isEmpty()); ++ ++ String path = "work/log/gridgain.log." + System.currentTimeMillis(); ++ ++ File file = new File(U.getGridGainHome(), path); ++ ++ assert !file.exists(); ++ ++ FileWriter writer = new FileWriter(file); ++ ++ String sep = System.getProperty("line.separator"); ++ ++ writer.write("Line 1" + sep); ++ writer.write(sep); ++ writer.write("Line 2" + sep); ++ writer.write("Line 3" + sep); ++ ++ writer.flush(); ++ writer.close(); ++ ++ log = compute.log(path, -1, -1); ++ assertNotNull(log); ++ assertEquals(1, log.size()); ++ assertEquals("Line 3", log.get(0)); ++ ++ // Indexing from 0. ++ log = compute.log(path, 2, 3); ++ assertNotNull(log); ++ assertEquals(2, log.size()); ++ assertEquals("Line 2", log.get(0)); ++ assertEquals("Line 3", log.get(1)); ++ ++ // Backward reading. ++ log = compute.log(path, -3, -1); ++ assertNotNull(log); ++ assertEquals(3, log.size()); ++ assertEquals("", log.get(0)); ++ assertEquals("Line 2", log.get(1)); ++ assertEquals("Line 3", log.get(2)); ++ ++ log = compute.log(path, -4, -3); ++ assertNotNull(log); ++ assertEquals(2, log.size()); ++ assertEquals("Line 1", log.get(0)); ++ assertEquals("", log.get(1)); ++ ++ log = compute.log(path, -5, -8); ++ assertNotNull(log); ++ assertEquals(0, log.size()); ++ ++ assert file.delete(); ++ ++ log = compute.log(TEST_LOG_PATH, -9, -5); ++ assertNotNull(log); ++ assertEquals(5, log.size()); ++ ++ log = compute.log(TEST_LOG_PATH, -1, -1); ++ assertNotNull(log); ++ assertEquals(1, log.size()); ++ assertEquals("[14:23:34,336][INFO ][main][GridTaskContinuousMapperSelfTest] >>> Stopping test: " + ++ "testContinuousMapperNegative in 2633 ms <<<", ++ log.get(0)); ++ ++ log = compute.log(TEST_LOG_PATH, -13641, -13640); ++ assertNotNull(log); ++ assertEquals(1, log.size()); ++ assertEquals("[14:14:22,515][INFO ][main][GridListenActorSelfTest] ", log.get(0)); ++ ++ assertThrows( ++ log(), ++ new Callable<Object>() { ++ @Override public Object call() throws Exception { ++ compute.log("wrong/path", -1, -1); ++ ++ return null; ++ } ++ }, ++ GridClientException.class, ++ null ++ ); ++ ++ assertThrows( ++ log(), ++ new Callable<Object>() { ++ @Override public Object call() throws Exception { ++ new File(U.getGridGainHome(), "work/security.log").createNewFile(); ++ ++ compute.log("work/log/../security.log", -1, -1); ++ ++ return null; ++ } ++ }, ++ GridClientException.class, ++ null ++ ); ++ } ++ ++ /** + * Test if all user objects passed interception. + * + * @throws Exception If failed. + */ + public void testInterception() throws Exception { + grid().cache(null).put("rem1", "rem1"); + + GridClientData data = client.data(); + + assertNotNull(data); + + overwriteIntercepted = true; + + data.put("key1", "val1"); + data.putAll(F.asMap("key2", "val2", "key3", "val3")); + data.remove("rem1"); + data.replace("key1", "nval1"); + + client.compute().execute(getTaskName(), getTaskArgument()); + + for (Object obj : Arrays.asList( + "rem1", "rem1", "key1", "key2", "key2", "val2", "key3", "val3", "rem1", "key1", "nval1", + getTaskArgument())) { + + assert INTERCEPTED_OBJECTS.containsKey(obj); + } + + assert ("nval1" + INTERCEPTED_SUF).equals(grid().cache(null).get("key1" + INTERCEPTED_SUF)); + assert ("val2" + INTERCEPTED_SUF).equals(grid().cache(null).get("key2" + INTERCEPTED_SUF)); + assert "rem1".equals(grid().cache(null).get("rem1")); + + overwriteIntercepted = false; + } + + /** + * Test task. + */ + private static class TestTask extends ComputeTaskSplitAdapter<List<Object>, Integer> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, List<Object> list) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (list != null) + for (final Object val : list) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(1); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return val == null ? 0 : val.toString().length(); + } + }); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + } + + /** + * Test task that sleeps 5 seconds. + */ + private static class SleepTestTask extends ComputeTaskSplitAdapter<List<Object>, Integer> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, List<Object> list) + { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (list != null) + for (final Object val : list) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(5000); + + return val == null ? 0 : val.toString().length(); + } + catch (InterruptedException ignored) { + return -1; + } + } + }); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + } + + /** + * Http test task with restriction to string arguments only. + */ + protected static class HttpTestTask extends ComputeTaskSplitAdapter<String, Integer> { + private final TestTask delegate = new TestTask(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + if (arg.endsWith("intercepted")) + arg = arg.substring(0, arg.length() - 11); + + JSON json = JSONSerializer.toJSON(arg); + + List list = json.isArray() ? JSONArray.toList((JSONArray)json, String.class, new JsonConfig()) : null; + + return delegate.split(gridSize, list); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + return delegate.reduce(results); + } + } + + /** + * Http wrapper for sleep task. + */ + protected static class SleepHttpTestTask extends ComputeTaskSplitAdapter<String, Integer> { + private final SleepTestTask delegate = new SleepTestTask(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + JSON json = JSONSerializer.toJSON(arg); + + List list = json.isArray() ? JSONArray.toList((JSONArray)json, String.class, new JsonConfig()) : null; + + return delegate.split(gridSize, list); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) { + return delegate.reduce(results); + } + } + + /** + * Simple HashMap based cache store emulation. + */ + private static class HashMapStore extends CacheStoreAdapter<Object, Object> { + /** Map for cache store. */ + private final Map<Object, Object> map = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { + for (Map.Entry e : map.entrySet()) { + clo.apply(e.getKey(), e.getValue()); + } + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(javax.cache.Cache.Entry<? extends Object, ? extends Object> e) { + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + } +}