http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/TaskEventSubjectIdSelfTest.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java index 64361d3,0000000..55d13dc mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java @@@ -1,617 -1,0 +1,622 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; ++import org.apache.ignite.client.balancer.*; ++import org.apache.ignite.client.impl.*; ++import org.apache.ignite.client.ssl.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.ssl.*; ++import org.apache.ignite.internal.processors.affinity.*; ++import org.apache.ignite.internal.util.typedef.*; ++import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; +import org.junit.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.testframework.GridTestUtils.*; + +/** + * + */ +public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Partitioned cache name. */ + protected static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Partitioned cache with async commit and backup name. */ + protected static final String PARTITIONED_ASYNC_BACKUP_CACHE_NAME = "partitioned-async-backup"; + + /** Replicated cache name. */ + private static final String REPLICATED_CACHE_NAME = "replicated"; + + /** Replicated cache with async commit name. */ + private static final String REPLICATED_ASYNC_CACHE_NAME = "replicated-async"; + + /** Nodes count. */ + protected static final int NODES_CNT = 5; + + /** Thread count to run tests. */ + private static final int THREAD_CNT = 20; + + /** Count of tasks to run. */ + private static final int TASK_EXECUTION_CNT = 50000; + + /** Count of cache puts in tests. */ + private static final int CACHE_PUT_CNT = 10000; + + /** Topology update frequency. */ + private static final int TOP_REFRESH_FREQ = 1000; + + /** Info messages will be printed each 5000 iterations. */ + private static final int STATISTICS_PRINT_STEP = 5000; + + /** Host. */ + public static final String HOST = "127.0.0.1"; + + /** Base for tcp rest ports. */ + public static final int REST_TCP_PORT_BASE = 12345; + + static { + System.setProperty("CLIENTS_MODULE_PATH", U.resolveGridGainPath("modules/clients").getAbsolutePath()); + } + + /** Client instance for each test. */ + private GridClient client; + + /** + * @return Client protocol that should be used. + */ + protected abstract GridClientProtocol protocol(); + + /** + * @return Server address to create first connection. + */ + protected abstract String serverAddress(); + + /** + * @return Whether SSL should be used. + */ + protected abstract boolean useSsl(); + + /** + * @return SSL context factory to use if SSL is enabled. + */ + protected abstract GridSslContextFactory sslContextFactory(); + + /** + * @return Count of iterations for sync commit test. + */ + protected int syncCommitIterCount() { + return 1000; + } + + /** + * @return Topology refresh frequency interval. + */ + protected int topologyRefreshFrequency() { + return TOP_REFRESH_FREQ; + } + + /** + * @return Max connection idle time. + */ + protected int maxConnectionIdleTime() { + return 5000; + } + + /** + * @return Number of tasks that should be executed during test. + */ + protected int taskExecutionCount() { + return TASK_EXECUTION_CNT; + } + + /** + * @return Number of puts to the cache. + */ + protected int cachePutCount() { + return CACHE_PUT_CNT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setLocalHost(HOST); + + assert c.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(REST_TCP_PORT_BASE); + + if (useSsl()) { + clientCfg.setRestTcpSslEnabled(true); + + clientCfg.setRestTcpSslContextFactory(sslContextFactory()); + } + + c.setClientConnectionConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + c.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(PARTITIONED_CACHE_NAME), + cacheConfiguration(REPLICATED_CACHE_NAME), cacheConfiguration(PARTITIONED_ASYNC_BACKUP_CACHE_NAME), + cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME)); + + return c; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + + if (cacheName == null) + cfg.setCacheMode(LOCAL); + else if (PARTITIONED_CACHE_NAME.equals(cacheName)) { + cfg.setCacheMode(PARTITIONED); + + cfg.setBackups(0); + } + else if (PARTITIONED_ASYNC_BACKUP_CACHE_NAME.equals(cacheName)) { + cfg.setCacheMode(PARTITIONED); + + cfg.setBackups(1); + } + else + cfg.setCacheMode(REPLICATED); + + cfg.setName(cacheName); + + if (cacheName != null && !cacheName.contains("async")) + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = GridClientFactory.start(clientConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridClientFactory.stop(client.id(), false); + + client = null; + } + + /** + * @throws Exception If failed. + */ + public void testSyncCommitFlagReplicated() throws Exception { + doTestSyncCommitFlag(client.data(REPLICATED_ASYNC_CACHE_NAME)); + } + + /** + * @throws Exception If failed. + */ + public void testSyncCommitFlagPartitioned() throws Exception { + doTestSyncCommitFlag(client.data(PARTITIONED_ASYNC_BACKUP_CACHE_NAME)); + } + + /** + * Extracts array from given iterator. + * + * @param nodes Iterator of nodes. + * @return Nodes array. + */ + private GridClientNode[] toArray(Iterator<? extends GridClientNode> nodes) { + ArrayList<GridClientNode> res = new ArrayList<>(); + + while (nodes.hasNext()) + res.add(nodes.next()); + + return res.toArray(new GridClientNode[res.size()]); + } + + /** + * Runs test on SYNC_COMMIT flag. + * + * @param data Client data to run test on. + * @throws Exception If failed. + */ + private void doTestSyncCommitFlag(final GridClientData data) throws Exception { + final String key = "k0"; + + Collection<UUID> affNodesIds = F.viewReadOnly( + grid(0).cache(data.cacheName()).affinity().mapKeyToPrimaryAndBackups(key), + F.node2id()); + + final GridClientData dataFirst = data.pinNodes(F.first(client.compute().nodes())); + + List<GridClientNode> affNodes = new ArrayList<>(); + + for (GridClientNode node : client.compute().nodes()) { + if (affNodesIds.contains(node.nodeId())) + affNodes.add(node); + } + + Assert.assertFalse(affNodes.isEmpty()); + + Iterator<? extends GridClientNode> it = affNodes.iterator(); + + final GridClientData dataOthers = data.pinNodes(it.next(), toArray(it)); + + for (int i = 0; i < syncCommitIterCount(); i++) { + final CountDownLatch l = new CountDownLatch(1); + + final String val = "v" + i; + + IgniteInternalFuture<?> f = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + l.await(); + + assertEquals(val, dataOthers.get(key)); + + return null; + } + }, THREAD_CNT); + + dataFirst.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put(key, val); + + l.countDown(); + + f.get(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedTaskRun() throws Exception { + final AtomicLong cnt = new AtomicLong(); + + final AtomicReference<GridClientException> err = new AtomicReference<>(); + + final ConcurrentLinkedQueue<String> execQueue = new ConcurrentLinkedQueue<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override + public void run() { + long processed; + + while ((processed = cnt.getAndIncrement()) < taskExecutionCount()) { + try { + if (processed > 0 && processed % STATISTICS_PRINT_STEP == 0) + info(">>>>>>> " + processed + " tasks finished."); + + String res = client.compute().execute(TestTask.class.getName(), null); + + execQueue.add(res); + } + catch (GridClientException e) { + err.compareAndSet(null, e); + } + } + } + }, THREAD_CNT, "client-task-request"); + + fut.get(); + + if (err.get() != null) + throw new Exception(err.get()); + + assertEquals(taskExecutionCount(), execQueue.size()); + + // With round-robin balancer each node must receive equal count of task requests. + Collection<String> executionIds = new HashSet<>(execQueue); + + assertTrue(executionIds.size() == NODES_CNT); + + Map<String, AtomicInteger> statisticsMap = new HashMap<>(); + + for (String id : executionIds) + statisticsMap.put(id, new AtomicInteger()); + + for (String id : execQueue) + statisticsMap.get(id).incrementAndGet(); + + info(">>>>>>> Execution statistics per node:"); + + for (Map.Entry<String, AtomicInteger> e : statisticsMap.entrySet()) + info(">>>>>>> " + e.getKey() + " run " + e.getValue().get() + " tasks"); + } + + /** + * @throws Exception If failed. + */ + public void test6Affinity() throws Exception { + GridClientData cache = client.data(PARTITIONED_CACHE_NAME); + UUID nodeId = cache.affinity("6"); + + info("Affinity node: " + nodeId); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedCachePut() throws Exception { + final AtomicLong keyCnt = new AtomicLong(); + + final AtomicReference<Exception> err = new AtomicReference<>(); + + final ConcurrentMap<String, T2<UUID, String>> puts = new ConcurrentHashMap<>(); + + final Map<UUID, Ignite> gridMap = new HashMap<>(); + + for (int i = 0; i < NODES_CNT; i++) { + Ignite g = grid(i); + + gridMap.put(g.cluster().localNode().id(), g); + } + + final Ignite ignite = F.first(gridMap.values()); + + assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings("OverlyStrongTypeCast") + @Override public void run() { + try { + GridClientData cache = client.data(PARTITIONED_CACHE_NAME); + + assertEquals(NODES_CNT, ((GridClientDataImpl)cache).projectionNodes().size()); + + long rawKey; + + while ((rawKey = keyCnt.getAndIncrement()) < cachePutCount()) { + String key = String.valueOf(rawKey); + + UUID nodeId = cache.affinity(key); + UUID srvNodeId = ignite.cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); + + if (!nodeId.equals(srvNodeId)) { + //GridClientDataAffinity clAff = + // ((GridClientConfiguration)getFieldValue(client, "cfg")). + // getDataConfiguration(PARTITIONED_CACHE_NAME).getAffinity(); + + //printAffinityState(gridMap.values()); + //info("Client affinity: " + clAff); + + info("Got wrong client mapping [key=" + key + ", exp=" + srvNodeId + + ", actual=" + nodeId + "]"); + } + + String val = "val" + rawKey; + + if (cache.put(key, val)) { + T2<UUID, String> old = puts.putIfAbsent(key, new T2<>(nodeId, val)); + + assert old == null : "Map contained entry [key=" + rawKey + ", entry=" + old + ']'; + } + } + } + catch (Exception e) { + err.compareAndSet(null, e); + } + } + }, THREAD_CNT, "client-cache-put"); + + fut.get(); + + if (err.get() != null) + throw new Exception(err.get()); + + assertEquals(cachePutCount(), puts.size()); + + // Now check that all puts went to primary nodes. + for (long i = 0; i < cachePutCount(); i++) { + String key = String.valueOf(i); + + ClusterNode node = ignite.cluster().mapKeyToNode(PARTITIONED_CACHE_NAME, key); + + if (!puts.get(key).get2().equals(gridMap.get(node.id()).cache(PARTITIONED_CACHE_NAME).peek(key))) { + // printAffinityState(gridMap.values()); + + failNotEquals("Node don't have value for key [nodeId=" + node.id() + ", key=" + key + "]", + puts.get(key).get2(), gridMap.get(node.id()).cache(PARTITIONED_CACHE_NAME).peek(key)); + } + + // Assert that client has properly determined affinity node. + if (!node.id().equals(puts.get(key).get1())) { + //GridClientDataAffinity clAff = + // ((GridClientConfiguration)getFieldValue(client, "cfg")). + // getDataConfiguration(PARTITIONED_CACHE_NAME).getAffinity(); + + //printAffinityState(gridMap.values()); + //info("Client affinity: " + clAff); + + UUID curAffNode = client.data(PARTITIONED_CACHE_NAME).affinity(key); + + failNotEquals( + "Got different mappings [key=" + key + ", currId=" + curAffNode + "]", + node.id(), puts.get(key).get1()); + } + + // Check that no other nodes see this key. + for (UUID id : F.view(gridMap.keySet(), F.notEqualTo(node.id()))) + assertNull("Got value in near cache.", gridMap.get(id).cache(PARTITIONED_CACHE_NAME).peek(key)); + } + + for (Ignite g : gridMap.values()) + g.cache(PARTITIONED_CACHE_NAME).clearAll(); + } + + /** + * @param grids Collection for Grids to print affinity info. + */ + private void printAffinityState(Iterable<Ignite> grids) { + for (Ignite g : grids) { + GridAffinityAssignmentCache affCache = getFieldValue( + ((IgniteKernal)g).internalCache(PARTITIONED_CACHE_NAME).context().affinity(), + "aff"); + + CacheAffinityFunction aff = getFieldValue(affCache, "aff"); + + info("Affinity [nodeId=" + g.cluster().localNode().id() + ", affinity=" + aff + "]"); + } + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * Creates client that will try to connect to only first node in grid. + * + * @return Client. + */ + private GridClientConfiguration clientConfiguration() { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setTopologyRefreshFrequency(topologyRefreshFrequency()); + cfg.setMaxConnectionIdleTime(maxConnectionIdleTime()); + + cfg.setProtocol(protocol()); + cfg.setServers(Arrays.asList(serverAddress())); + cfg.setBalancer(new GridClientRoundRobinBalancer()); + + if (useSsl()) + cfg.setSslContextFactory(sslContextFactory()); + + GridClientDataConfiguration loc = new GridClientDataConfiguration(); + + GridClientDataConfiguration partitioned = new GridClientDataConfiguration(); + partitioned.setName(PARTITIONED_CACHE_NAME); + partitioned.setAffinity(new GridClientPartitionAffinity()); + + GridClientDataConfiguration partitionedAsyncBackup = new GridClientDataConfiguration(); + partitionedAsyncBackup.setName(PARTITIONED_ASYNC_BACKUP_CACHE_NAME); + partitionedAsyncBackup.setAffinity(new GridClientPartitionAffinity()); + + GridClientDataConfiguration replicated = new GridClientDataConfiguration(); + replicated.setName(REPLICATED_CACHE_NAME); + + GridClientDataConfiguration replicatedAsync = new GridClientDataConfiguration(); + replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME); + + cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, replicatedAsync, partitionedAsyncBackup)); + + return cfg; + } + + /** + * Test task. Returns a tuple in which first component is id of node that has split the task, + * and second component is count of nodes that executed jobs. + */ + private static class TestTask extends ComputeTaskSplitAdapter<Object, String> { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Count of tasks this job was split to. */ + private int gridSize; + + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) - throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize); + + this.gridSize = gridSize; + + final String locNodeId = ignite.cluster().localNode().id().toString(); + + for (int i = 0; i < gridSize; i++) { + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + return new IgniteBiTuple<>(locNodeId, 1); + } + }); + } + + return jobs; + } + + /** {@inheritDoc} */ - @Override public String reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public String reduce(List<ComputeJobResult> results) { + int sum = 0; + + String locNodeId = null; + + for (ComputeJobResult res : results) { + IgniteBiTuple<String, Integer> part = res.getData(); + + if (locNodeId == null) + locNodeId = part.get1(); + + Integer i = part.get2(); + + if (i != null) + sum += i; + } + + assert gridSize == sum; + + return locNodeId; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java index 80a0248,0000000..fcd92d0 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java @@@ -1,156 -1,0 +1,156 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.configuration.*; ++import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; - import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.internal.client.GridClientProtocol.*; +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * Tests that client is able to connect to a grid with only default cache enabled. + */ +public class ClientDefaultCacheSelfTest extends GridCommonAbstractTest { + /** Path to jetty config configured with SSL. */ + private static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Host. */ + private static final String HOST = "127.0.0.1"; + + /** Port. */ + private static final int TCP_PORT = 11211; + + /** Cached local node id. */ + private UUID locNodeId; + + /** Http port. */ + private static final int HTTP_PORT = 8081; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IGNITE_JETTY_PORT, String.valueOf(HTTP_PORT)); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + + System.clearProperty (IGNITE_JETTY_PORT); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + locNodeId = grid().localNode().id(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestJettyPath(REST_JETTY_CFG); + + cfg.setClientConnectionConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(defaultCacheConfiguration()); + + return cfg; + } + + /** + * @return Client. + * @throws GridClientException In case of error. + */ + private GridClient clientTcp() throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setProtocol(TCP); + cfg.setServers(getServerList(TCP_PORT)); + cfg.setDataConfigurations(Collections.singleton(new GridClientDataConfiguration())); + + GridClient gridClient = GridClientFactory.start(cfg); + + assert F.exist(gridClient.compute().nodes(), new IgnitePredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode n) { + return n.nodeId().equals(locNodeId); + } + }); + + return gridClient; + } + + /** + * Builds list of connection strings with few different ports. + * Used to avoid possible failures in case of port range active. + * + * @param startPort Port to start list from. + * @return List of client connection strings. + */ + private Collection<String> getServerList(int startPort) { + Collection<String> srvs = new ArrayList<>(); + + for (int i = startPort; i < startPort + 10; i++) + srvs.add(HOST + ":" + i); + + return srvs; + } + + /** + * @throws Exception If failed. + */ + public void testTcp() throws Exception { + try { + boolean putRes = cache().putx("key", 1); + + assert putRes : "Put operation failed"; + + GridClient client = clientTcp(); + + Integer val = client.data().<String, Integer>get("key"); + + assert val != null; + + assert val == 1; + } + finally { + GridClientFactory.stopAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java index f3b5490,0000000..b748cbd mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java @@@ -1,64 -1,0 +1,64 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; - import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.resources.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Get affinity for task argument. + */ +public class ClientGetAffinityTask extends TaskSingleJobSplitAdapter<String, Integer> { + /** Grid. */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ - @Override protected Object executeJob(int gridSize, String arg) throws IgniteCheckedException { ++ @Override protected Object executeJob(int gridSize, String arg) { + A.notNull(arg, "task argument"); + + String[] split = arg.split(":", 2); + + A.ensure(split.length == 2, "Task argument should have format 'cacheName:affinityKey'."); + + String cacheName = split[0]; + String affKey = split[1]; + + if ("null".equals(cacheName)) + cacheName = null; + + ClusterNode node = ignite.cluster().mapKeyToNode(cacheName, affKey); + + return node.id().toString(); + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java index ee2a31b,0000000..9599e05 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java @@@ -1,59 -1,0 +1,58 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import net.sf.json.*; - import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task summarizes length of all strings in the arguments list. + * <p> + * The argument of the task is JSON-serialized array of objects to calculate string length sum of. + */ +public class ClientHttpTask extends ComputeTaskSplitAdapter<String, Integer> { + /** Task delegate. */ + private final ClientTcpTask delegate = new ClientTcpTask(); + + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + JSON json = JSONSerializer.toJSON(arg); + + List list = json.isArray() ? JSONArray.toList((JSONArray)json, String.class, new JsonConfig()) : null; + + //noinspection unchecked + return delegate.split(gridSize, list); + } + + /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public Integer reduce(List<ComputeJobResult> results) { + return delegate.reduce(results); + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java index ca587d7,0000000..3354f57 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java @@@ -1,53 -1,0 +1,52 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.*; +import org.apache.ignite.portables.*; + +import java.util.*; + +/** + * Task where argument and result are {@link ClientTestPortable}. + */ +public class ClientPortableArgumentTask extends TaskSingleJobSplitAdapter { + /** {@inheritDoc} */ - @Override protected Object executeJob(int gridSize, Object arg) throws IgniteCheckedException { ++ @Override protected Object executeJob(int gridSize, Object arg) { + Collection args = (Collection)arg; + + Iterator<Object> it = args.iterator(); + + assert args.size() == 2 : args.size(); + + boolean expPortable = (Boolean)it.next(); + + ClientTestPortable p; + + if (expPortable) { + PortableObject obj = (PortableObject)it.next(); + + p = obj.deserialize(); + } + else + p = (ClientTestPortable)it.next(); + + assert p != null; + + return new ClientTestPortable(p.i + 1, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java index 1e936f8,0000000..c545d00 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java @@@ -1,44 -1,0 +1,43 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; - import org.apache.ignite.cache.*; +import org.apache.ignite.resources.*; + +/** + * Task creates portable object and puts it in cache. + */ +public class ClientPutPortableTask extends TaskSingleJobSplitAdapter { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ - @Override protected Object executeJob(int gridSize, Object arg) throws IgniteCheckedException { ++ @Override protected Object executeJob(int gridSize, Object arg) { + String cacheName = (String)arg; + - GridCache<Object, Object> cache = ignite.cache(cacheName); ++ IgniteCache<Object, Object> cache = ignite.jcache(cacheName); + + ClientTestPortable p = new ClientTestPortable(100, true); + + cache.put(1, p); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java index b16a338,0000000..ecb27f0 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java @@@ -1,180 -1,0 +1,179 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; - import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.resources.*; +import org.springframework.beans.factory.*; +import org.springframework.context.support.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Start node task, applicable arguments: + * <ul> + * <li>tcp</li> + * <li>http</li> + * <li>tcp+ssl</li> + * <li>http+ssl</li> + * </ul> + */ +public class ClientStartNodeTask extends TaskSingleJobSplitAdapter<String, Integer> { + /** + * Available node's configurations. + */ + private static final Map<String, String> NODE_CFG = new HashMap<String, String>() {{ + put("tcp", "modules/clients/src/test/resources/spring-server-node.xml"); + put("http", "modules/clients/src/test/resources/spring-server-node.xml"); + put("tcp+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + put("http+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + }}; + + /** */ + @IgniteLoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ - @Override protected Object executeJob(int gridSize, String type) throws IgniteCheckedException { ++ @Override protected Object executeJob(int gridSize, String type) { + log.info(">>> Starting new grid node [currGridSize=" + gridSize + ", arg=" + type + "]"); + + if (type == null) + throw new IllegalArgumentException("Node type to start should be specified."); + + IgniteConfiguration cfg = getConfig(type); + + // Generate unique for this VM grid name. + String gridName = cfg.getGridName() + " (" + UUID.randomUUID() + ")"; + + // Update grid name (required to be unique). + cfg.setGridName(gridName); + + // Start new node in current VM. + Ignite g = G.start(cfg); + + log.info(">>> Grid started [nodeId=" + g.cluster().localNode().id() + ", name='" + g.name() + "']"); + + return true; + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } + + /** + * Load grid configuration for specified node type. + * + * @param type Node type to load configuration for. + * @return Grid configuration for specified node type. + */ + static IgniteConfiguration getConfig(String type) { + String path = NODE_CFG.get(type); + + if (path == null) + throw new IllegalArgumentException("Unsupported node type: " + type); + + URL url = U.resolveGridGainUrl(path); + + BeanFactory ctx = new FileSystemXmlApplicationContext(url.toString()); + + return (IgniteConfiguration)ctx.getBean("grid.cfg"); + } + + /** + * Example for start/stop node tasks. + * + * @param args Not used. + */ + public static void main(String[] args) { + String nodeType = "tcp+ssl"; + + // Start initial node = 1 + try (Ignite g = G.start(NODE_CFG.get(nodeType))) { + // Change topology. + changeTopology(g, 4, 1, nodeType); + changeTopology(g, 1, 4, nodeType); + + // Stop node by id = 0 + g.compute().execute(ClientStopNodeTask.class, g.cluster().localNode().id().toString()); + + // Wait for node stops. + //U.sleep(1000); + + assert G.allGrids().isEmpty(); + } + catch (IgniteCheckedException e) { + System.err.println("Uncaught exception: " + e.getMessage()); + + e.printStackTrace(System.err); + } + } + + /** + * Change topology. + * + * @param parent Grid to execute tasks on. + * @param add New nodes count. + * @param rmv Remove nodes count. + * @param type Type of nodes to manipulate. + * @throws IgniteCheckedException On any exception. + */ + private static void changeTopology(Ignite parent, int add, int rmv, String type) throws IgniteCheckedException { + Collection<ComputeTaskFuture<?>> tasks = new ArrayList<>(); + + IgniteCompute comp = parent.compute().withAsync(); + + // Start nodes in parallel. + while (add-- > 0) { + comp.execute(ClientStartNodeTask.class, type); + + tasks.add(comp.future()); + } + + for (ComputeTaskFuture<?> task : tasks) + task.get(); + + // Stop nodes in sequence. + while (rmv-- > 0) + parent.compute().execute(ClientStopNodeTask.class, type); + + // Wait for node stops. + //U.sleep(1000); + + Collection<String> gridNames = new ArrayList<>(); + + for (Ignite g : G.allGrids()) + gridNames.add(g.name()); + + parent.log().info(">>> Available grids: " + gridNames); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java index c930ed3,0000000..27375dc mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java @@@ -1,127 -1,0 +1,127 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; - import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; ++import org.apache.ignite.resources.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Stop node task, applicable arguments: + * <ul> + * <li>node id (as string) to stop or</li> + * <li>node type (see start nodes task).</li> + * </ul> + */ +public class ClientStopNodeTask extends ComputeTaskSplitAdapter<String, Integer> { + /** */ + @IgniteLoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + Collection<ComputeJob> jobs = new ArrayList<>(); + + for (int i = 0; i < gridSize; i++) + jobs.add(new StopJob(arg)); + + return jobs; + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + ComputeJobResultPolicy superRes = super.result(res, rcvd); + + // Deny failover. + if (superRes == FAILOVER) + superRes = WAIT; + + return superRes; + } + + /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public Integer reduce(List<ComputeJobResult> results) { + int stoppedCnt = 0; + + for (ComputeJobResult res : results) + if (!res.isCancelled()) + stoppedCnt+=(Integer)res.getData(); + + return stoppedCnt; + } + + /** + * Stop node job it is executed on. + */ + private static class StopJob extends ComputeJobAdapter { + /** */ + private final String gridType; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private StopJob(String gridType) { + this.gridType = gridType; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + log.info(">>> Stop node [nodeId=" + ignite.cluster().localNode().id() + ", name='" + ignite.name() + "']"); + + String prefix = ClientStartNodeTask.getConfig(gridType).getGridName() + " ("; + + if (!ignite.name().startsWith(prefix)) { + int stoppedCnt = 0; + + for (Ignite g : G.allGrids()) + if (g.name().startsWith(prefix)) { + try { + log.info(">>> Grid stopping [nodeId=" + g.cluster().localNode().id() + + ", name='" + g.name() + "']"); + + G.stop(g.name(), true); + + stoppedCnt++; + } + catch (IllegalStateException e) { + log.warning("Failed to stop grid.", e); + } + } + + return stoppedCnt; + } + + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java index 398c0e8,0000000..2aa0a4f mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java @@@ -1,73 -1,0 +1,71 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task calculate length of the string passed in the argument. + * <p> + * The argument of the task is a simple string to calculate length of. + */ +public class ClientStringLengthTask extends ComputeTaskSplitAdapter<String, Integer> { + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (arg != null) + for (final Object val : arg.split("")) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return val == null ? 0 : val.toString().length(); + } + }); + + return jobs; + } + + /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java index 93d5b24,0000000..caf6e87 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java @@@ -1,73 -1,0 +1,71 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task summarizes length of all strings in the arguments list. + * <p> + * The argument of the task is a collection of objects to calculate string length sum of. + */ +public class ClientTcpTask extends ComputeTaskSplitAdapter<List<Object>, Integer> { + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, List<Object> list) - throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(int gridSize, List<Object> list) { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (list != null) + for (final Object val : list) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return val == null ? 0 : val.toString().length(); + } + }); + + return jobs; + } + + /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java index 7f9aa46,0000000..2257a47 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java @@@ -1,490 -1,0 +1,490 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.portables.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.portables.*; + +import java.io.*; +import java.util.*; + +/** + * Test portable object. + */ +@SuppressWarnings("PublicField") +public class ClientTestPortable implements PortableMarshalAware, Serializable { + /** */ + public byte b; + + /** */ + public byte bRaw; + + /** */ + public short s; + + /** */ + public short sRaw; + + /** */ + public int i; + + /** */ + public int iRaw; + + /** */ + public long l; + + /** */ + public long lRaw; + + /** */ + public float f; + + /** */ + public float fRaw; + + /** */ + public double d; + + /** */ + public double dRaw; + + /** */ + public char c; + + /** */ + public char cRaw; + + /** */ + public boolean bool; + + /** */ + public boolean boolRaw; + + /** */ + public String str; + + /** */ + public String strRaw; + + /** */ + public UUID uuid; + + /** */ + public UUID uuidRaw; + + /** */ + public Date date; + + /** */ + public Date dateRaw; + + /** */ + public TestEnum e; + + /** */ + public TestEnum eRaw; + + /** */ + public byte[] bArr; + + /** */ + public byte[] bArrRaw; + + /** */ + public short[] sArr; + + /** */ + public short[] sArrRaw; + + /** */ + public int[] iArr; + + /** */ + public int[] iArrRaw; + + /** */ + public long[] lArr; + + /** */ + public long[] lArrRaw; + + /** */ + public float[] fArr; + + /** */ + public float[] fArrRaw; + + /** */ + public double[] dArr; + + /** */ + public double[] dArrRaw; + + /** */ + public char[] cArr; + + /** */ + public char[] cArrRaw; + + /** */ + public boolean[] boolArr; + + /** */ + public boolean[] boolArrRaw; + + /** */ + public String[] strArr; + + /** */ + public String[] strArrRaw; + + /** */ + public UUID[] uuidArr; + + /** */ + public UUID[] uuidArrRaw; + + /** */ + public Date[] dateArr; + + /** */ + public Date[] dateArrRaw; + + /** */ + public TestEnum[] eArr; + + /** */ + public TestEnum[] eArrRaw; + + /** */ + public Object[] objArr; + + /** */ + public Object[] objArrRaw; + + /** */ + public Collection<String> col; + + /** */ + public Collection<String> colRaw; + + /** */ + public Map<Integer, String> map; + + /** */ + public Map<Integer, String> mapRaw; + + /** */ + public ClientTestPortable portable1; + + /** */ + public ClientTestPortable portable2; + + /** */ + public ClientTestPortable portableRaw1; + + /** */ + public ClientTestPortable portableRaw2; + + /** + */ + public ClientTestPortable() { + // No-op. + } + + /** + * @param val Value. + * @param createInner If {@code true} creates nested object. + */ + public ClientTestPortable(int val, boolean createInner) { + b = (byte)val; + bRaw = (byte)(val + 1); + + s = (short)val; + sRaw = (short)(val + 1); + + i = val; + iRaw = i + 1; + + l = val; + lRaw = i + 1; + + f = val + 0.5f; + fRaw = f + 1; + + d = val + 0.5f; + dRaw = d + 1; + + c = (char)val; + cRaw = (char)(val + 1); + + bool = true; + boolRaw = false; + + str = String.valueOf(i); + strRaw = String.valueOf(iRaw); + + uuid = new UUID(i, i); + uuidRaw = new UUID(iRaw, iRaw); + + date = new Date(i); + dateRaw = new Date(iRaw); + + e = enumValue(i); + eRaw = enumValue(iRaw); + + bArr = new byte[]{b, (byte)(b + 1)}; + bArrRaw = new byte[]{bRaw, (byte)(bRaw + 1)}; + + sArr = new short[]{s, (short)(s + 1)}; + sArrRaw = new short[]{sRaw, (short)(sRaw + 1)}; + + iArr = new int[]{i, i + 1}; + iArrRaw = new int[]{iRaw, iRaw + 1}; + + lArr = new long[]{l, l + 1}; + lArrRaw = new long[]{lRaw, lRaw + 1}; + + fArr = new float[]{f, f + 1}; + fArrRaw = new float[]{fRaw, fRaw + 1}; + + dArr = new double[]{d, d + 1}; + dArrRaw = new double[]{dRaw, dRaw + 1}; + + cArr = new char[]{c, (char)(c + 1)}; + cArrRaw = new char[]{cRaw, (char)(cRaw + 1)}; + + boolArr = new boolean[]{true, true}; + boolArrRaw = new boolean[]{true, true}; + + strArr = new String[]{str, str + "1"}; + strArrRaw = new String[]{strRaw, strRaw + "1"}; + + uuidArr = new UUID[]{uuid, new UUID(uuid.getMostSignificantBits() + 1, uuid.getLeastSignificantBits() + 1)}; + uuidArrRaw = new UUID[]{uuidRaw, + new UUID(uuidRaw.getMostSignificantBits() + 1, uuidRaw.getLeastSignificantBits() + 1)}; + + dateArr = new Date[]{date, new Date(date.getTime() + 1)}; + dateArrRaw = new Date[]{dateRaw, new Date(dateRaw.getTime() + 1)}; + + eArr = new TestEnum[]{enumValue(i), enumValue(i + 1)}; + eArrRaw = new TestEnum[]{enumValue(iRaw), enumValue(iRaw + 1)}; + + objArr = new Object[]{uuid, new UUID(uuid.getMostSignificantBits() + 1, uuid.getLeastSignificantBits() + 1)}; + objArrRaw = new Object[]{uuidRaw, + new UUID(uuidRaw.getMostSignificantBits() + 1, uuidRaw.getLeastSignificantBits() + 1)}; + + col = Arrays.asList(str, str + "1"); + colRaw = Arrays.asList(strRaw, strRaw + "1"); + + map = new HashMap<>(); + map.put(1, str); + map.put(2, str + "1"); + + mapRaw = new HashMap<>(); + mapRaw.put(1, strRaw); + mapRaw.put(2, strRaw + "1"); + + if (createInner) { + portable1 = new ClientTestPortable(val + 1, false); + portable2 = portable1; + + portableRaw1 = new ClientTestPortable(val + 2, false); + portableRaw2 = portableRaw1; + } + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeByte("_b", b); + writer.writeShort("_s", s); + writer.writeInt("_i", i); + writer.writeLong("_l", l); + writer.writeFloat("_f", f); + writer.writeDouble("_d", d); + writer.writeChar("_c", c); + writer.writeBoolean("_bool", bool); + writer.writeString("_str", str); + writer.writeUuid("_uuid", uuid); + writer.writeDate("_date", date); + writer.writeEnum("_enum", e); + writer.writeByteArray("_bArr", bArr); + writer.writeShortArray("_sArr", sArr); + writer.writeIntArray("_iArr", iArr); + writer.writeLongArray("_lArr", lArr); + writer.writeFloatArray("_fArr", fArr); + writer.writeDoubleArray("_dArr", dArr); + writer.writeCharArray("_cArr", cArr); + writer.writeBooleanArray("_boolArr", boolArr); + writer.writeStringArray("_strArr", strArr); + writer.writeUuidArray("_uuidArr", uuidArr); + writer.writeDateArray("_dateArr", dateArr); + writer.writeEnumArray("_eArr", eArr); + writer.writeObjectArray("_objArr", objArr); + writer.writeCollection("_col", col); + writer.writeMap("_map", map); + writer.writeObject("_portable1", portable1); + writer.writeObject("_portable2", portable2); + + PortableRawWriter raw = writer.rawWriter(); + + raw.writeByte(bRaw); + raw.writeShort(sRaw); + raw.writeInt(iRaw); + raw.writeLong(lRaw); + raw.writeFloat(fRaw); + raw.writeDouble(dRaw); + raw.writeChar(cRaw); + raw.writeBoolean(boolRaw); + raw.writeString(strRaw); + raw.writeUuid(uuidRaw); + raw.writeDate(dateRaw); + raw.writeEnum(eRaw); + raw.writeByteArray(bArrRaw); + raw.writeShortArray(sArrRaw); + raw.writeIntArray(iArrRaw); + raw.writeLongArray(lArrRaw); + raw.writeFloatArray(fArrRaw); + raw.writeDoubleArray(dArrRaw); + raw.writeCharArray(cArrRaw); + raw.writeBooleanArray(boolArrRaw); + raw.writeStringArray(strArrRaw); + raw.writeUuidArray(uuidArrRaw); + raw.writeDateArray(dateArrRaw); + raw.writeEnumArray(eArrRaw); + raw.writeObjectArray(objArrRaw); + raw.writeCollection(colRaw); + raw.writeMap(mapRaw); + raw.writeObject(portableRaw1); + raw.writeObject(portableRaw2); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + b = reader.readByte("_b"); + s = reader.readShort("_s"); + i = reader.readInt("_i"); + l = reader.readLong("_l"); + f = reader.readFloat("_f"); + d = reader.readDouble("_d"); + c = reader.readChar("_c"); + bool = reader.readBoolean("_bool"); + str = reader.readString("_str"); + uuid = reader.readUuid("_uuid"); + date = reader.readDate("_date"); + e = reader.readEnum("_enum", TestEnum.class); + bArr = reader.readByteArray("_bArr"); + sArr = reader.readShortArray("_sArr"); + iArr = reader.readIntArray("_iArr"); + lArr = reader.readLongArray("_lArr"); + fArr = reader.readFloatArray("_fArr"); + dArr = reader.readDoubleArray("_dArr"); + cArr = reader.readCharArray("_cArr"); + boolArr = reader.readBooleanArray("_boolArr"); + strArr = reader.readStringArray("_strArr"); + uuidArr = reader.readUuidArray("_uuidArr"); + dateArr = reader.readDateArray("_dateArr"); + eArr = reader.readEnumArray("_eArr", TestEnum.class); + objArr = reader.readObjectArray("_objArr"); + col = reader.readCollection("_col"); + map = reader.readMap("_map"); + portable1 = (ClientTestPortable)reader.readObject("_portable1"); + portable2 = (ClientTestPortable)reader.readObject("_portable2"); + + PortableRawReader raw = reader.rawReader(); + + bRaw = raw.readByte(); + sRaw = raw.readShort(); + iRaw = raw.readInt(); + lRaw = raw.readLong(); + fRaw = raw.readFloat(); + dRaw = raw.readDouble(); + cRaw = raw.readChar(); + boolRaw = raw.readBoolean(); + strRaw = raw.readString(); + uuidRaw = raw.readUuid(); + dateRaw = raw.readDate(); + eRaw = raw.readEnum(TestEnum.class); + bArrRaw = raw.readByteArray(); + sArrRaw = raw.readShortArray(); + iArrRaw = raw.readIntArray(); + lArrRaw = raw.readLongArray(); + fArrRaw = raw.readFloatArray(); + dArrRaw = raw.readDoubleArray(); + cArrRaw = raw.readCharArray(); + boolArrRaw = raw.readBooleanArray(); + strArrRaw = raw.readStringArray(); + uuidArrRaw = raw.readUuidArray(); + dateArrRaw = raw.readDateArray(); + eArrRaw = raw.readEnumArray(TestEnum.class); + objArrRaw = raw.readObjectArray(); + colRaw = raw.readCollection(); + mapRaw = raw.readMap(); + portableRaw1 = (ClientTestPortable)raw.readObject(); + portableRaw2 = (ClientTestPortable)raw.readObject(); + } + + /** + * @param idx Value index. + * @return Enum value. + */ + static TestEnum enumValue(int idx) { + return TestEnum.values()[idx % TestEnum.values().length]; + } + + /** + * Test enum. + */ + private enum TestEnum { + /** */ + VAL1, + + /** */ + VAL2, + + /** */ + VAl3, + + /** */ + VAL4, + + /** */ + VAL5, + + /** */ + VAL6, + + /** */ + VAL7, + + /** */ + VAL8, + + /** */ + VAL9, + + /** */ + VAL10 + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientTestPortable.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java index e94f8d5,0000000..493be08 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java @@@ -1,85 -1,0 +1,85 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Task used to test portable affinity key. + */ +public class ClientTestPortableAffinityKeyTask extends ComputeTaskAdapter<Object, Boolean> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> clusterNodes, - @Nullable final Object arg) throws IgniteCheckedException { ++ @Nullable final Object arg) { + for (ClusterNode node : clusterNodes) { + if (node.isLocal()) + return Collections.singletonMap(new ComputeJobAdapter() { - @Override public Object execute() throws IgniteCheckedException { ++ @Override public Object execute() { + return executeJob(arg); + } + }, node); + } + - throw new IgniteCheckedException("Failed to find local node in task topology: " + clusterNodes); ++ throw new IgniteException("Failed to find local node in task topology: " + clusterNodes); + } + + /** {@inheritDoc} */ - @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) { + return results.get(0).getData(); + } + + /** + * @param arg Argument. + * @return Execution result. - * @throws IgniteCheckedException If failed. ++ * @throws IgniteException If failed. + */ - protected Boolean executeJob(Object arg) throws IgniteCheckedException { ++ protected Boolean executeJob(Object arg) throws IgniteException { + Collection args = (Collection)arg; + + Iterator<Object> it = args.iterator(); + + assert args.size() == 3 : args.size(); + + PortableObject obj = (PortableObject)it.next(); + + String cacheName = (String)it.next(); + + String expAffKey = (String)it.next(); + + Object affKey = ignite.cache(cacheName).affinity().affinityKey(obj); + + if (!expAffKey.equals(affKey)) - throw new IgniteCheckedException("Unexpected affinity key: " + affKey); ++ throw new IgniteException("Unexpected affinity key: " + affKey); + + if (!ignite.cache(cacheName).affinity().mapKeyToNode(obj).isLocal()) - throw new IgniteCheckedException("Job is not run on primary node."); ++ throw new IgniteException("Job is not run on primary node."); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java index ace975e,0000000..2345fe0 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/SleepTestTask.java @@@ -1,68 -1,0 +1,66 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + - import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task, that sleeps for 10 seconds in split and returns + * the length of an argument. + */ +public class SleepTestTask extends ComputeTaskSplitAdapter<String, Integer> { + /** {@inheritDoc} */ - @Override public Collection<? extends ComputeJob> split(int gridSize, String arg) - throws IgniteCheckedException { ++ @Override public Collection<? extends ComputeJob> split(int gridSize, String arg) { + return Collections.singleton(new ComputeJobAdapter(arg) { + @Override public Object execute() { + try { + Thread.sleep(10000); + + String val = argument(0); + + return val == null ? 0 : val.length(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + /** {@inheritDoc} */ - @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public Integer reduce(List<ComputeJobResult> results) { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { ++ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java index 3289e8c,0000000..a35d806 mode 100644,000000..100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/TaskSingleJobSplitAdapter.java @@@ -1,75 -1,0 +1,70 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +/** + * Adapter for {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} + * overriding {@code split(...)} method to return singleton with self instance. + * This adapter should be used for tasks that always splits to a single task. + * @param <T> Type of the task execution argument. + * @param <R> Type of the task result returning from {@link org.apache.ignite.compute.ComputeTask#reduce(List)} method. + */ +public abstract class TaskSingleJobSplitAdapter<T, R> extends ComputeTaskSplitAdapter<T, R> { + /** Empty constructor. */ + protected TaskSingleJobSplitAdapter() { + // No-op. + } + + /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(final int gridSize, final T arg) throws IgniteCheckedException { ++ @Override protected Collection<? extends ComputeJob> split(final int gridSize, final T arg) { + return Collections.singleton(new ComputeJobAdapter() { - @Override public Object execute() throws IgniteCheckedException { ++ @Override public Object execute() { + return executeJob(gridSize, arg); + } + }); + } + + /** {@inheritDoc} */ - @Override public R reduce(List<ComputeJobResult> results) throws IgniteCheckedException { ++ @Override public R reduce(List<ComputeJobResult> results) { + assert results.size() == 1; + + ComputeJobResult res = results.get(0); + + if (res.isCancelled()) - throw new IgniteCheckedException("Reduce receives failed job."); ++ throw new IgniteException("Reduce receives failed job."); + + return res.getData(); + } + + /** + * Executes this task's job. + * + * @param gridSize Number of available grid nodes. Note that returned number of + * jobs can be less, equal or greater than this grid size. + * @param arg Task execution argument. Can be {@code null}. + * @return Job execution result (possibly {@code null}). This result will be returned + * in {@link org.apache.ignite.compute.ComputeJobResult#getData()} method passed into + * {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} method into task on caller node. - * @throws IgniteCheckedException If job execution caused an exception. This exception will be - * returned in {@link org.apache.ignite.compute.ComputeJobResult#getException()} method passed into - * {@link org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} method into task on caller node. - * If execution produces a {@link RuntimeException} or {@link Error}, then - * it will be wrapped into {@link IgniteCheckedException}. + */ - protected abstract Object executeJob(int gridSize, T arg) throws IgniteCheckedException; ++ protected abstract Object executeJob(int gridSize, T arg); +}