http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java index d3f0cd0,2c6e70f..75dcbe4 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java @@@ -105,31 -91,12 +91,13 @@@ public abstract class GridCacheQueueMul @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = new TcpDiscoverySpi(); + cfg.setPublicThreadPoolSize(RETRIES * 2); - spi.setIpFinder(ipFinder); + cfg.setSystemThreadPoolSize(RETRIES * 2); - cfg.setDiscoverySpi(spi); + cfg.setMarshaller(new OptimizedMarshaller(false)); + cfg.setConnectorConfiguration(null); - cfg.setExecutorService( - new ThreadPoolExecutor( - RETRIES * 2, - RETRIES * 2, - 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>())); - - cfg.setExecutorServiceShutdown(true); - - cfg.setSystemExecutorService( - new ThreadPoolExecutor( - RETRIES * 2, - RETRIES * 2, - 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>())); - - cfg.setSystemExecutorServiceShutdown(true); - return cfg; }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java index 96ff0a6,ad16abc..802ac4a --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java @@@ -79,10 -77,9 +77,10 @@@ public abstract class GridCachePreloadL c.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); c.setIncludeProperties(); - c.setDeploymentMode(IgniteDeploymentMode.SHARED); + c.setDeploymentMode(DeploymentMode.SHARED); c.setNetworkTimeout(10000); + c.setConnectorConfiguration(null); - c.setMarshaller(new IgniteOptimizedMarshaller(false)); + c.setMarshaller(new OptimizedMarshaller(false)); // c.setPeerClassLoadingLocalClassPathExclude(GridCachePreloadLifecycleAbstractTest.class.getName(), // MyValue.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReaderPreloadSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsModesSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java index f23557e,ea2b8f8..cf6fdd1 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java @@@ -62,7 -62,11 +62,11 @@@ public class GridCacheCommandHandlerSel IgniteConfiguration cfg = super.getConfiguration(); cfg.setLocalHost("localhost"); - cfg.setConnectorConfiguration(new ConnectorConfiguration()); + - ClientConnectionConfiguration clnCfg = new ClientConnectionConfiguration(); ++ ConnectorConfiguration clnCfg = new ConnectorConfiguration(); + clnCfg.setRestTcpHost("localhost"); + - cfg.setClientConnectionConfiguration(clnCfg); ++ cfg.setConnectorConfiguration(clnCfg); cfg.setDiscoverySpi(disco); cfg.setCacheConfiguration(cacheCfg); // Add 'null' cache configuration. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedIgniteHomeSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedIgniteHomeSelfTest.java index 0000000,2da9d7f..de32c5b mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedIgniteHomeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedIgniteHomeSelfTest.java @@@ -1,0 -1,105 +1,106 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util; + + import junit.framework.*; + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.logger.java.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + + import static org.apache.ignite.IgniteSystemProperties.*; + import static org.apache.ignite.internal.util.IgniteUtils.*; + + /** + * Checks that node can be started without operations with undefined IGNITE_HOME. + * <p> + * Notes: + * 1. The test intentionally extends JUnit {@link TestCase} class to make the test + * independent from {@link GridCommonAbstractTest} stuff. + * 2. Do not replace native Java asserts with JUnit ones - test won't fall on TeamCity. + */ + public class GridStartupWithUndefinedIgniteHomeSelfTest extends TestCase { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_COUNT = 2; + + /** {@inheritDoc} */ + @Override protected void tearDown() throws Exception { + // Next grid in the same VM shouldn't use cached values produced by these tests. + nullifyHomeDirectory(); + + U.getIgniteHome(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopWithUndefinedIgniteHome() throws Exception { + IgniteUtils.nullifyHomeDirectory(); + + // We can't use U.getIgniteHome() here because + // it will initialize cached value which is forbidden to override. + String ggHome = IgniteSystemProperties.getString(IGNITE_HOME); + + assert ggHome != null; + + U.setIgniteHome(null); + + String ggHome0 = U.getIgniteHome(); + + assert ggHome0 == null; + + IgniteLogger log = new JavaLogger(); + + log.info(">>> Test started: " + getName()); + log.info("Grid start-stop test count: " + GRID_COUNT); + + for (int i = 0; i < GRID_COUNT; i++) { + TcpDiscoverySpi disc = new TcpDiscoverySpi(); + + disc.setIpFinder(IP_FINDER); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + // We have to explicitly configure path to license config because of undefined IGNITE_HOME. + cfg.setLicenseUrl("file:///" + ggHome + "/" + Ignition.DFLT_LIC_FILE_NAME); + + // Default console logger is used + cfg.setGridLogger(log); + cfg.setDiscoverySpi(disc); ++ cfg.setConnectorConfiguration(null); + + try (Ignite g = G.start(cfg)) { + assert g != null; + + ggHome0 = U.getIgniteHome(); + + assert ggHome0 == null; + + X.println("Stopping grid " + g.cluster().localNode().id()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTcpCommunicationBenchmark.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 48400cf,137d2db..eda0356 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@@ -97,8 -98,7 +98,8 @@@ public class GridTcpSpiForwardingSelfTe spi.setLocalPortRange(1); cfg.setDiscoverySpi(spi); cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + cfg.setMarshaller(new OptimizedMarshaller(false)); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0000000,c4304b2..91cb20d mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@@ -1,0 -1,988 +1,990 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.port.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.spi.*; + import org.apache.ignite.spi.discovery.*; + import org.apache.ignite.spi.discovery.tcp.internal.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.spi.discovery.tcp.messages.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.events.EventType.*; + import static org.apache.ignite.spi.IgnitePortProtocol.*; + + /** + * Test for {@link TcpDiscoverySpi}. + */ + public class TcpDiscoverySelfTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private Map<String, TcpDiscoverySpi> discoMap = new HashMap<>(); + + /** */ + private UUID nodeId; + + /** + * @throws Exception If fails. + */ + public TcpDiscoverySelfTest() throws Exception { + super(false); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "deprecation"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi; + + if (gridName.contains("FailBeforeNodeAddedSentSpi")) + spi = new FailBeforeNodeAddedSentSpi(); + else if (gridName.contains("FailBeforeNodeLeftSentSpi")) + spi = new FailBeforeNodeLeftSentSpi(); + else + spi = new TcpDiscoverySpi(); + + discoMap.put(gridName, spi); + + spi.setIpFinder(ipFinder); + + spi.setNetworkTimeout(2500); + + spi.setHeartbeatFrequency(1000); + + spi.setMaxMissedHeartbeats(3); + + spi.setIpFinderCleanFrequency(5000); + + spi.setJoinTimeout(5000); + + cfg.setDiscoverySpi(spi); + + cfg.setCacheConfiguration(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setIncludeProperties(); + + if (!gridName.contains("LoopbackProblemTest")) + cfg.setLocalHost("127.0.0.1"); + + if (gridName.contains("testFailureDetectionOnNodePing")) { + spi.setReconnectCount(1); // To make test faster: on Windows 1 connect takes 1 second. + spi.setHeartbeatFrequency(40000); + } + ++ cfg.setConnectorConfiguration(null); ++ + if (nodeId != null) + cfg.setNodeId(nodeId); + + if (gridName.contains("NonSharedIpFinder")) { + TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(); + + finder.setAddresses(Arrays.asList("127.0.0.1:47501")); + + spi.setIpFinder(finder); + } + else if (gridName.contains("MulticastIpFinder")) { + TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder(); + + finder.setAddressRequestAttempts(10); + finder.setMulticastGroup(GridTestUtils.getNextMulticastGroup(getClass())); + finder.setMulticastPort(GridTestUtils.getNextMulticastPort(getClass())); + + spi.setIpFinder(finder); + + // Loopback multicast discovery is not working on Mac OS + // (possibly due to http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7122846). + if (U.isMacOs()) + spi.setLocalAddress(F.first(U.allLocalIps())); + } + + return cfg; + } + + /** + * @throws Exception If any error occurs. + */ + public void testSingleNodeStartStop() throws Exception { + try { + startGrid(1); + } + finally { + stopGrid(1); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testThreeNodesStartStop() throws Exception { + try { + startGrid(1); + startGrid(2); + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any errors occur. + */ + public void testNodeConnectMessageSize() throws Exception { + try { + Ignite g1 = startGrid(1); + + final AtomicInteger gridNameIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startGrid(gridNameIdx.incrementAndGet()); + + return null; + } + }, 4, "grid-starter"); + + Collection<TcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes(); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + g1.configuration().getMarshaller().marshal(nodes, bos); + + info(">>> Approximate node connect message size [topSize=" + nodes.size() + + ", msgSize=" + bos.size() / 1024.0 + "KB]"); + } + finally { + stopAllGrids(false); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testPing() throws Exception { + try { + startGrid(1); + startGrid(2); + startGrid(3); + + info("Nodes were started"); + + for (Map.Entry<String, TcpDiscoverySpi> e : discoMap.entrySet()) { + DiscoverySpi spi = e.getValue(); + + for (Ignite g : G.allGrids()) { + boolean res = spi.pingNode(g.cluster().localNode().id()); + + assert res : e.getKey() + " failed to ping " + g.cluster().localNode().id() + " of " + g.name(); + + info(e.getKey() + " pinged " + g.cluster().localNode().id() + " of " + g.name()); + } + } + + info("All nodes pinged successfully."); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing1() throws Exception { + try { + Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); + startGrid("testFailureDetectionOnNodePing2"); + Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g1, g3); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing2() throws Exception { + try { + startGrid("testFailureDetectionOnNodePingCoordinator"); + Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); + Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g3, g2); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailureDetectionOnNodePing3() throws Exception { + try { + Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); + Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); + startGrid("testFailureDetectionOnNodePing3"); + + testFailureDetectionOnNodePing(g2, g1); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception { + final CountDownLatch cnt = new CountDownLatch(1); + + pingingNode.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, + EventType.EVT_NODE_FAILED + ); + + info("Nodes were started"); + + discoMap.get(failedNode.name()).simulateNodeFailure(); + + TcpDiscoverySpi spi = discoMap.get(pingingNode.name()); + + boolean res = spi.pingNode(failedNode.cluster().localNode().id()); + + assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res); + + // Heartbeat interval is 40 seconds, but we should detect node failure faster. + assert cnt.await(7, SECONDS); + } + + /** + * @throws Exception If any error occurs. + */ + public void testNodeAdded() throws Exception { + try { + final Ignite g1 = startGrid(1); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Node joined: " + evt.message()); + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + TcpDiscoveryNode node = ((TcpDiscoveryNode)discoMap.get(g1.name()). + getNode(discoEvt.eventNode().id())); + + assert node != null && node.visible(); + + cnt.countDown(); + + return true; + } + }, + EventType.EVT_NODE_JOINED + ); + + startGrid(2); + startGrid(3); + + info("Nodes were started"); + + assert cnt.await(1, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testOrdinaryNodeLeave() throws Exception { + try { + Ignite g1 = startGrid(1); + startGrid(2); + startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, + EVT_NODE_LEFT + ); + + info("Nodes were started"); + + stopGrid(3); + stopGrid(2); + + boolean res = cnt.await(1, SECONDS); + + assert res; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testCoordinatorNodeLeave() throws Exception { + try { + startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch cnt = new CountDownLatch(1); + + g2.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + + info("Nodes were started"); + + stopGrid(1); + + assert cnt.await(1, SECONDS); + + // Start new grid, ensure that added to topology + final CountDownLatch cnt2 = new CountDownLatch(1); + + g2.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt2.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + + startGrid(3); + + assert cnt2.await(1, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testOrdinaryNodeFailure() throws Exception { + try { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + Ignite g3 = startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(2); + + g1.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, + EventType.EVT_NODE_FAILED + ); + + info("Nodes were started"); + + discoMap.get(g2.name()).simulateNodeFailure(); + discoMap.get(g3.name()).simulateNodeFailure(); + + assert cnt.await(25, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testCoordinatorNodeFailure() throws Exception { + try { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + final CountDownLatch cnt = new CountDownLatch(1); + + g2.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, EventType.EVT_NODE_FAILED); + + info("Nodes were started"); + + discoMap.get(g1.name()).simulateNodeFailure(); + + assert cnt.await(20, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testMetricsSending() throws Exception { + final AtomicBoolean stopping = new AtomicBoolean(); + + try { + final CountDownLatch latch1 = new CountDownLatch(1); + + final Ignite g1 = startGrid(1); + + IgnitePredicate<Event> lsnr1 = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info(evt.message()); + + latch1.countDown(); + + return true; + } + }; + + g1.events().localListen(lsnr1, EVT_NODE_METRICS_UPDATED); + + assert latch1.await(10, SECONDS); + + g1.events().stopLocalListen(lsnr1); + + final CountDownLatch latch1_1 = new CountDownLatch(1); + final CountDownLatch latch1_2 = new CountDownLatch(1); + final CountDownLatch latch2_1 = new CountDownLatch(1); + final CountDownLatch latch2_2 = new CountDownLatch(1); + + final Ignite g2 = startGrid(2); + + g2.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (stopping.get()) + return true; + + info(evt.message()); + + UUID id = ((DiscoveryEvent) evt).eventNode().id(); + + if (id.equals(g1.cluster().localNode().id())) + latch2_1.countDown(); + else if (id.equals(g2.cluster().localNode().id())) + latch2_2.countDown(); + else + assert false : "Event fired for unknown node."; + + return true; + } + }, + EVT_NODE_METRICS_UPDATED + ); + + g1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (stopping.get()) + return true; + + info(evt.message()); + + UUID id = ((DiscoveryEvent) evt).eventNode().id(); + + if (id.equals(g1.cluster().localNode().id())) + latch1_1.countDown(); + else if (id.equals(g2.cluster().localNode().id())) + latch1_2.countDown(); + else + assert false : "Event fired for unknown node."; + + return true; + } + }, EVT_NODE_METRICS_UPDATED); + + assert latch1_1.await(10, SECONDS); + assert latch1_2.await(10, SECONDS); + assert latch2_1.await(10, SECONDS); + assert latch2_2.await(10, SECONDS); + } + finally { + stopping.set(true); + + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailBeforeNodeAddedSent() throws Exception { + try { + Ignite g1 = startGrid(1); + + final CountDownLatch joinCnt = new CountDownLatch(2); + final CountDownLatch failCnt = new CountDownLatch(1); + + g1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_JOINED) + joinCnt.countDown(); + else if (evt.type() == EVT_NODE_FAILED) + failCnt.countDown(); + else + assert false : "Unexpected event type: " + evt; + + return true; + } + }, EVT_NODE_JOINED, EVT_NODE_FAILED); + + startGrid("FailBeforeNodeAddedSentSpi"); + + startGrid(3); + + assert joinCnt.await(10, SECONDS); + assert failCnt.await(10, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testFailBeforeNodeLeftSent() throws Exception { + try { + startGrid(1); + startGrid(2); + + startGrid("FailBeforeNodeLeftSentSpi"); + + Ignite g3 = startGrid(3); + + final CountDownLatch cnt = new CountDownLatch(1); + + g3.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + cnt.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + stopGrid(1); + + assert cnt.await(20, SECONDS); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testIpFinderCleaning() throws Exception { + try { + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), + new InetSocketAddress("host2", 1024))); + + Ignite g1 = startGrid(1); + + long timeout = (long)(discoMap.get(g1.name()).getIpFinderCleanFrequency() * 1.5); + + Thread.sleep(timeout); + + assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); + + // Check that missing addresses are returned back. + ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address. + + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), + new InetSocketAddress("host2", 1024))); + + Thread.sleep(timeout); + + assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testNonSharedIpFinder() throws Exception { + try { + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.sleep(4000); + + return startGrid("NonSharedIpFinder-2"); + } + }, 1, "grid-starter"); + + // This node should wait until any node "from ipFinder" appears, see log messages. + Ignite g = startGrid("NonSharedIpFinder-1"); + + assert g.cluster().localNode().order() == 2; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testMulticastIpFinder() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid("MulticastIpFinder-" + i); + + assertEquals(i + 1, g.cluster().nodes().size()); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)g.configuration().getDiscoverySpi(); + + TcpDiscoveryMulticastIpFinder ipFinder = (TcpDiscoveryMulticastIpFinder)spi.getIpFinder(); + + boolean found = false; + + for (GridPortRecord rec : ((IgniteKernal) g).context().ports().records()) { + if ((rec.protocol() == UDP) && rec.port() == ipFinder.getMulticastPort()) { + found = true; + + break; + } + } + + assertTrue("GridTcpDiscoveryMulticastIpFinder should register port." , found); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testInvalidAddressIpFinder() throws Exception { + ipFinder.setShared(false); + + ipFinder.setAddresses(Collections.singletonList("some-host")); + + try { + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startGrid(1); + + return null; + } + }, + IgniteCheckedException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testJoinTimeout() throws Exception { + try { + // This start will fail as expected. + Throwable t = GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + startGrid("NonSharedIpFinder-1"); + + return null; + } + }, IgniteCheckedException.class, null); + + assert X.hasCause(t, IgniteSpiException.class) : "Unexpected exception: " + t; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDirtyIpFinder() throws Exception { + try { + // Dirty IP finder + for (int i = 47500; i < 47520; i++) + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("127.0.0.1", i), + new InetSocketAddress("unknown-host", i))); + + assert ipFinder.isShared(); + + startGrid(1); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testDuplicateId() throws Exception { + try { + // Random ID. + startGrid(1); + + nodeId = UUID.randomUUID(); + + startGrid(2); + + // Duplicate ID. + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown and output to log. + startGrid(3); + + return null; + } + }, + IgniteCheckedException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testLoopbackProblemFirstNodeOnLoopback() throws Exception { + // On Windows and Mac machines two nodes can reside on the same port + // (if one node has localHost="127.0.0.1" and another has localHost="0.0.0.0"). + // So two nodes do not even discover each other. + if (U.isWindows() || U.isMacOs()) + return; + + try { + startGridNoOptimize(1); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown because we start node which does not use loopback address, + // but the first node does. + startGridNoOptimize("LoopbackProblemTest"); + + return null; + } + }, + IgniteException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testLoopbackProblemSecondNodeOnLoopback() throws Exception { + if (U.isWindows() || U.isMacOs()) + return; + + try { + startGridNoOptimize("LoopbackProblemTest"); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + // Exception will be thrown because we start node which uses loopback address, + // but the first node does not. + startGridNoOptimize(1); + + return null; + } + }, + IgniteException.class, + null); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testGridStartTime() throws Exception { + try { + startGridsMultiThreaded(5); + + Long startTime = null; + + IgniteKernal firstGrid = null; + + Collection<IgniteKernal> grids = new ArrayList<>(); + + for (int i = 0; i < 5 ; i++) { + IgniteKernal grid = (IgniteKernal)grid(i); + + assertTrue(grid.context().discovery().gridStartTime() > 0); + + if (i > 0) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + else + startTime = grid.context().discovery().gridStartTime(); + + if (grid.localNode().order() == 1) + firstGrid = grid; + else + grids.add(grid); + } + + assertNotNull(firstGrid); + + stopGrid(firstGrid.name()); + + for (IgniteKernal grid : grids) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + + grids.add((IgniteKernal) startGrid(5)); + + for (IgniteKernal grid : grids) + assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); + } + finally { + stopAllGrids(); + } + } + + /** + * Starts new grid with given index. Method optimize is not invoked. + * + * @param idx Index of the grid to start. + * @return Started grid. + * @throws Exception If anything failed. + */ + private Ignite startGridNoOptimize(int idx) throws Exception { + return startGridNoOptimize(getTestGridName(idx)); + } + + /** + * Starts new grid with given name. Method optimize is not invoked. + * + * @param gridName Grid name. + * @return Started grid. + * @throws Exception If failed. + */ + private Ignite startGridNoOptimize(String gridName) throws Exception { + return G.start(getConfiguration(gridName)); + } + + /** + * + */ + private static class FailBeforeNodeAddedSentSpi extends TcpDiscoverySpi { + /** */ + private int i; + + /** {@inheritDoc} */ + @Override void onBeforeMessageSentAcrossRing(Serializable msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (++i == 2) { + simulateNodeFailure(); + + throw new RuntimeException("Avoid message sending: " + msg.getClass()); + } + } + } + + /** + * + */ + private static class FailBeforeNodeLeftSentSpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override void onBeforeMessageSentAcrossRing(Serializable msg) { + if (msg instanceof TcpDiscoveryNodeLeftMessage) { + simulateNodeFailure(); + + throw new RuntimeException("Avoid message sending: " + msg.getClass()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySnapshotHistoryTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySnapshotHistoryTest.java index 0000000,32f4a05..b28c904 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySnapshotHistoryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySnapshotHistoryTest.java @@@ -1,0 -1,173 +1,174 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.spi.discovery.*; + import org.apache.ignite.testframework.junits.common.*; + + import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.*; + + /** + * Tests for topology snapshots history. + */ + public class TcpDiscoverySnapshotHistoryTest extends GridCommonAbstractTest { + /** */ + public TcpDiscoverySnapshotHistoryTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi()); + cfg.setCacheConfiguration(); + cfg.setLocalHost("127.0.0.1"); ++ cfg.setConnectorConfiguration(null); + + return cfg; + } + + /** + * @throws Exception If any error occurs. + */ + public void testHistorySupported() throws Exception { + try { + final Ignite g = startGrid(); + + DiscoverySpi spi = g.configuration().getDiscoverySpi(); + + DiscoverySpiHistorySupport ann = U.getAnnotation(spi.getClass(), DiscoverySpiHistorySupport.class); + + assertNotNull("Spi does not have annotation for history support", ann); + + assertTrue("History support is disabled for current spi", ann.value()); + } + finally { + stopGrid(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testSettingNewTopologyHistorySize() throws Exception { + try { + final Ignite g = startGrid(); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)g.configuration().getDiscoverySpi(); + + assertEquals(DFLT_TOP_HISTORY_SIZE, spi.getTopHistorySize()); + + spi.setTopHistorySize(DFLT_TOP_HISTORY_SIZE + 1); + + assertEquals(DFLT_TOP_HISTORY_SIZE + 1, spi.getTopHistorySize()); + + spi.setTopHistorySize(1); + + assertEquals(DFLT_TOP_HISTORY_SIZE + 1, spi.getTopHistorySize()); + } + finally { + stopGrid(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testNodeAdded() throws Exception { + try { + // Add grid #1 + final Ignite g1 = startGrid(1); + + assertTopVer(1, g1); + + assertEquals(1, g1.cluster().topologyVersion()); + + // Add grid # 2 + final Ignite g2 = startGrid(2); + + assertTopVer(2, g1, g2); + + for (int i = 1; i <= 2; i++) + assertEquals(i, g2.cluster().topology(i).size()); + + // Add grid # 3 + final Ignite g3 = startGrid(3); + + assertTopVer(3, g1, g2, g3); + + for (int i = 1; i <= 3; i++) + assertEquals(i, g3.cluster().topology(i).size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testNodeAddedAndRemoved() throws Exception { + try { + // Add grid #1 + final Ignite g1 = startGrid(1); + + assertTopVer(1, g1); + + assertEquals(1, g1.cluster().topologyVersion()); + + // Add grid #2 + final Ignite g2 = startGrid(2); + + assertTopVer(2, g1, g2); + + for (int i = 1; i <= 2; i++) + assertEquals(i, g2.cluster().topology(i).size()); + + // Add grid #3 + final Ignite g3 = startGrid(3); + + assertTopVer(3, g1, g2, g3); + + for (int i = 1; i <= 3; i++) + assertEquals(i, g3.cluster().topology(i).size()); + + // Stop grid #3 + stopGrid(g3.name()); + + assertTopVer(4, g1, g2); + } + finally { + stopAllGrids(); + } + } + + /** + * Check if specified grid instances have unexpected topology version. + * + * @param expTopVer Expected topology version. + * @param ignites Grid instances for checking topology version. + */ + private static void assertTopVer(long expTopVer, Ignite... ignites) { + for (Ignite g : ignites) + assertEquals("Grid has wrong topology version.", expTopVer, g.cluster().topologyVersion()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java index 0000000,602548f..a091f8b mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java @@@ -1,0 -1,303 +1,304 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.ignitefs; + + import org.apache.hadoop.conf.*; + import org.apache.hadoop.fs.*; + import org.apache.hadoop.fs.FileSystem; + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.fs.hadoop.*; + import org.apache.ignite.internal.processors.fs.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.ignitefs.IgniteFsMode.*; + import static org.apache.ignite.ignitefs.hadoop.GridGgfsHadoopParameters.*; + import static org.apache.ignite.internal.processors.fs.GridGgfsAbstractSelfTest.*; + + /** + * Tests for GGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC. + */ + public abstract class GridGgfsHadoopDualAbstractSelfTest extends GridGgfsCommonAbstractTest { + /** GGFS block size. */ + protected static final int GGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** Primary file system URI. */ + protected static final String PRIMARY_URI = "ggfs://ggfs:grid@/"; + + /** Primary file system configuration path. */ + protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml"; + + /** Primary file system REST endpoint configuration map. */ + protected static final Map<String, String> PRIMARY_REST_CFG = new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", "10500"); + }}; + + /** Secondary file system REST endpoint configuration map. */ + protected static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", "11500"); + }}; + + /** Directory. */ + protected static final IgniteFsPath DIR = new IgniteFsPath("/dir"); + + /** Sub-directory. */ + protected static final IgniteFsPath SUBDIR = new IgniteFsPath(DIR, "subdir"); + + /** File. */ + protected static final IgniteFsPath FILE = new IgniteFsPath(SUBDIR, "file"); + + /** Default data chunk (128 bytes). */ + protected static byte[] chunk; + + /** Primary GGFS. */ + protected static GridGgfsImpl ggfs; + + /** Secondary GGFS. */ + protected static GridGgfsImpl ggfsSecondary; + + /** GGFS mode. */ + protected final IgniteFsMode mode; + + /** + * Constructor. + * + * @param mode GGFS mode. + */ + protected GridGgfsHadoopDualAbstractSelfTest(IgniteFsMode mode) { + this.mode = mode; + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + } + + /** + * Start grid with GGFS. + * + * @param gridName Grid name. + * @param ggfsName GGFS name + * @param mode GGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithGgfs(String gridName, String ggfsName, IgniteFsMode mode, + @Nullable IgniteFsFileSystem secondaryFs, @Nullable Map<String, String> restCfg) throws Exception { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("dataCache"); + ggfsCfg.setMetaCacheName("metaCache"); + ggfsCfg.setName(ggfsName); + ggfsCfg.setBlockSize(GGFS_BLOCK_SIZE); + ggfsCfg.setDefaultMode(mode); + ggfsCfg.setIpcEndpointConfiguration(restCfg); + ggfsCfg.setSecondaryFileSystem(secondaryFs); + ggfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + ggfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + cfg.setLocalHost("127.0.0.1"); ++ cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + chunk = new byte[128]; + + for (int i = 0; i < chunk.length; i++) + chunk[i] = (byte)i; + + Ignite igniteSecondary = startGridWithGgfs("grid-secondary", "ggfs-secondary", PRIMARY, null, SECONDARY_REST_CFG); + + IgniteFsFileSystem hadoopFs = new GridGgfsHadoopFileSystemWrapper(SECONDARY_URI, SECONDARY_CFG); + + Ignite ignite = startGridWithGgfs("grid", "ggfs", mode, hadoopFs, PRIMARY_REST_CFG); + + ggfsSecondary = (GridGgfsImpl) igniteSecondary.fileSystem("ggfs-secondary"); + ggfs = (GridGgfsImpl) ignite.fileSystem("ggfs"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + clear(ggfs); + clear(ggfsSecondary); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + G.stopAll(true); + } + + /** + * Convenient method to group paths. + * + * @param paths Paths to group. + * @return Paths as array. + */ + protected IgniteFsPath[] paths(IgniteFsPath... paths) { + return paths; + } + + /** + * Check how prefetch override works. + * + * @throws Exception IF failed. + */ + public void testOpenPrefetchOverride() throws Exception { + create(ggfsSecondary, paths(DIR, SUBDIR), paths(FILE)); + + // Write enough data to the secondary file system. + final int blockSize = GGFS_BLOCK_SIZE; + + IgniteFsOutputStream out = ggfsSecondary.append(FILE, false); + + int totalWritten = 0; + + while (totalWritten < blockSize * 2 + chunk.length) { + out.write(chunk); + + totalWritten += chunk.length; + } + + out.close(); + + awaitFileClose(ggfsSecondary, FILE); + + // Instantiate file system with overridden "seq reads before prefetch" property. + Configuration cfg = new Configuration(); + + cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG)); + + int seqReads = SEQ_READS_BEFORE_PREFETCH + 1; + + cfg.setInt(String.format(PARAM_GGFS_SEQ_READS_BEFORE_PREFETCH, "ggfs:grid@"), seqReads); + + FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg); + + // Read the first two blocks. + Path fsHome = new Path(PRIMARY_URI); + Path dir = new Path(fsHome, DIR.name()); + Path subdir = new Path(dir, SUBDIR.name()); + Path file = new Path(subdir, FILE.name()); + + FSDataInputStream fsIn = fs.open(file); + + final byte[] readBuf = new byte[blockSize * 2]; + + fsIn.readFully(0, readBuf, 0, readBuf.length); + + // Wait for a while for prefetch to finish (if any). + GridGgfsMetaManager meta = ggfs.context().meta(); + + GridGgfsFileInfo info = meta.info(meta.fileId(FILE)); + + GridGgfsBlockKey key = new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2); + + GridCache<GridGgfsBlockKey, byte[]> dataCache = ggfs.context().kernalContext().cache().cache( + ggfs.configuration().getDataCacheName()); + + for (int i = 0; i < 10; i++) { + if (dataCache.containsKey(key)) + break; + else + U.sleep(100); + } + + fsIn.close(); + + // Remove the file from the secondary file system. + ggfsSecondary.delete(FILE, false); + + // Try reading the third block. Should fail. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteFsInputStream in0 = ggfs.open(FILE); + + in0.seek(blockSize * 2); + + try { + in0.read(readBuf); + } + finally { + U.closeQuiet(in0); + } + + return null; + } + }, IOException.class, + "Failed to read data due to secondary file system exception: /dir/subdir/file"); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemHandshakeSelfTest.java index 0000000,838d1a7..0416a3f mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemHandshakeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemHandshakeSelfTest.java @@@ -1,0 -1,309 +1,310 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.ignitefs; + + import org.apache.hadoop.conf.*; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.*; + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.processors.fs.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.spi.communication.tcp.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; + import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + import static org.apache.ignite.ignitefs.IgniteFsMode.*; + import static org.apache.ignite.internal.fs.hadoop.GridGgfsHadoopUtils.*; + import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; + + /** + * Tests for GGFS file system handshake. + */ + public class GridGgfsHadoopFileSystemHandshakeSelfTest extends GridGgfsCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid name. */ + private static final String GRID_NAME = "grid"; + + /** GGFS name. */ + private static final String GGFS_NAME = "ggfs"; + + /** GGFS path. */ + private static final IgniteFsPath PATH = new IgniteFsPath("/path"); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Tests for Grid and GGFS having normal names. + * + * @throws Exception If failed. + */ + public void testHandshake() throws Exception { + startUp(false, false); + + checkValid(GGFS_NAME + ":" + GRID_NAME + "@"); + checkValid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkValid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(GGFS_NAME + "@"); + checkInvalid(GGFS_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(""); + checkInvalid("127.0.0.1"); + checkInvalid("127.0.0.1:" + DFLT_IPC_PORT); + } + + /** + * Tests for Grid having {@code null} name and GGFS having normal name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultGrid() throws Exception { + startUp(true, false); + + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(GGFS_NAME + "@"); + checkValid(GGFS_NAME + "@127.0.0.1"); + checkValid(GGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(""); + checkInvalid("127.0.0.1"); + checkInvalid("127.0.0.1:" + DFLT_IPC_PORT); + } + + /** + * Tests for Grid having normal name and GGFS having {@code null} name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultGgfs() throws Exception { + startUp(false, true); + + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(GGFS_NAME + "@"); + checkInvalid(GGFS_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(":" + GRID_NAME + "@"); + checkValid(":" + GRID_NAME + "@127.0.0.1"); + checkValid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(""); + checkInvalid("127.0.0.1"); + checkInvalid("127.0.0.1:" + DFLT_IPC_PORT); + } + + /** + * Tests for Grid having {@code null} name and GGFS having {@code null} name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultGridDefaultGgfs() throws Exception { + startUp(true, true); + + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(GGFS_NAME + "@"); + checkInvalid(GGFS_NAME + "@127.0.0.1"); + checkInvalid(GGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(""); + checkValid("127.0.0.1"); + checkValid("127.0.0.1:" + DFLT_IPC_PORT); + } + + /** + * Perform startup. + * + * @param dfltGridName Default Grid name. + * @param dfltGgfsName Default GGFS name. + * @throws Exception If failed. + */ + private void startUp(boolean dfltGridName, boolean dfltGgfsName) throws Exception { + Ignite ignite = G.start(gridConfiguration(dfltGridName, dfltGgfsName)); + + IgniteFs ggfs = ignite.fileSystem(dfltGgfsName ? null : GGFS_NAME); + + ggfs.mkdirs(PATH); + } + + /** + * Create Grid configuration. + * + * @param dfltGridName Default Grid name. + * @param dfltGgfsName Default GGFS name. + * @return Grid configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration gridConfiguration(boolean dfltGridName, boolean dfltGgfsName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(dfltGridName ? null : GRID_NAME); + + cfg.setLocalHost("127.0.0.1"); ++ cfg.setConnectorConfiguration(null); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("partitioned"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setDistributionMode(PARTITIONED_ONLY); + dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg); + + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("partitioned"); + ggfsCfg.setMetaCacheName("replicated"); + ggfsCfg.setName(dfltGgfsName ? null : GGFS_NAME); + ggfsCfg.setPrefetchBlocks(1); + ggfsCfg.setDefaultMode(PRIMARY); + ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", String.valueOf(DFLT_IPC_PORT)); + }}); + + ggfsCfg.setManagementPort(-1); + ggfsCfg.setBlockSize(512 * 1024); + + cfg.setGgfsConfiguration(ggfsCfg); + + return cfg; + } + + /** + * Check valid file system endpoint. + * + * @param authority Authority. + * @throws Exception If failed. + */ + private void checkValid(String authority) throws Exception { + FileSystem fs = fileSystem(authority); + + assert fs.exists(new Path(PATH.toString())); + } + + /** + * Check invalid file system endpoint. + * + * @param authority Authority. + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void checkInvalid(final String authority) throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fileSystem(authority); + + return null; + } + }, IOException.class, null); + } + + /** + * + * + * @param authority Authority. + * @return File system. + * @throws Exception If failed. + */ + private static FileSystem fileSystem(String authority) throws Exception { + return FileSystem.get(new URI("ggfs://" + authority + "/"), configuration(authority)); + } + + /** + * Create configuration for test. + * + * @param authority Authority. + * @return Configuration. + */ + private static Configuration configuration(String authority) { + Configuration cfg = new Configuration(); + + cfg.set("fs.defaultFS", "ggfs://" + authority + "/"); + cfg.set("fs.ggfs.impl", org.apache.ignite.ignitefs.hadoop.v1.GridGgfsHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.ggfs.impl", + org.apache.ignite.ignitefs.hadoop.v2.GridGgfsHadoopFileSystem.class.getName()); + + cfg.setBoolean("fs.ggfs.impl.disable.cache", true); + + cfg.setBoolean(String.format(PARAM_GGFS_ENDPOINT_NO_EMBED, authority), true); + cfg.setBoolean(String.format(PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); + + return cfg; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemLoggerStateSelfTest.java index 0000000,88808c0..7c97386 mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemLoggerStateSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopFileSystemLoggerStateSelfTest.java @@@ -1,0 -1,324 +1,325 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.ignitefs; + + import org.apache.hadoop.conf.*; + import org.apache.hadoop.fs.FileSystem; + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.ignitefs.hadoop.v1.*; + import org.apache.ignite.internal.fs.common.*; + import org.apache.ignite.internal.processors.fs.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + + import java.lang.reflect.*; + import java.net.*; + import java.nio.file.*; + import java.util.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.ignitefs.IgniteFsMode.*; + import static org.apache.ignite.ignitefs.hadoop.GridGgfsHadoopParameters.*; + + /** + * Ensures that sampling is really turned on/off. + */ + public class GridGgfsHadoopFileSystemLoggerStateSelfTest extends GridGgfsCommonAbstractTest { + /** GGFS. */ + private GridGgfsEx ggfs; + + /** File system. */ + private FileSystem fs; + + /** Whether logging is enabled in FS configuration. */ + private boolean logging; + + /** whether sampling is enabled. */ + private Boolean sampling; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(fs); + + ggfs = null; + fs = null; + + G.stopAll(true); + + logging = false; + sampling = null; + } + + /** + * Startup the grid and instantiate the file system. + * + * @throws Exception If failed. + */ + private void startUp() throws Exception { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("partitioned"); + ggfsCfg.setMetaCacheName("replicated"); + ggfsCfg.setName("ggfs"); + ggfsCfg.setBlockSize(512 * 1024); + ggfsCfg.setDefaultMode(PRIMARY); + ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", "10500"); + }}); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("ggfs-grid"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + cfg.setLocalHost("127.0.0.1"); ++ cfg.setConnectorConfiguration(null); + + Ignite g = G.start(cfg); + + ggfs = (GridGgfsEx)g.fileSystem("ggfs"); + + ggfs.globalSampling(sampling); + + fs = fileSystem(); + } + + /** + * When logging is disabled and sampling is not set no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingNotSet() throws Exception { + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is enabled and sampling is not set file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingNotSet() throws Exception { + logging = true; + + startUp(); + + assert logEnabled(); + } + + /** + * When logging is disabled and sampling is disabled no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingDisabled() throws Exception { + sampling = false; + + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is enabled and sampling is disabled no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingDisabled() throws Exception { + logging = true; + sampling = false; + + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is disabled and sampling is enabled file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingEnabled() throws Exception { + sampling = true; + + startUp(); + + assert logEnabled(); + } + + /** + * When logging is enabled and sampling is enabled file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingEnabled() throws Exception { + logging = true; + sampling = true; + + startUp(); + + assert logEnabled(); + } + + /** + * Ensure sampling change through API causes changes in logging on subsequent client connections. + * + * @throws Exception If failed. + */ + public void testSamplingChange() throws Exception { + // Start with sampling not set. + startUp(); + + assert !logEnabled(); + + fs.close(); + + // "Not set" => true transition. + ggfs.globalSampling(true); + + fs = fileSystem(); + + assert logEnabled(); + + fs.close(); + + // True => "not set" transition. + ggfs.globalSampling(null); + + fs = fileSystem(); + + assert !logEnabled(); + + // "Not-set" => false transition. + ggfs.globalSampling(false); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // False => "not=set" transition. + ggfs.globalSampling(null); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // True => false transition. + ggfs.globalSampling(true); + ggfs.globalSampling(false); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // False => true transition. + ggfs.globalSampling(true); + + fs = fileSystem(); + + assert logEnabled(); + } + + /** + * Ensure that log directory is set to GGFS when client FS connects. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testLogDirectory() throws Exception { + startUp(); + + assertEquals(Paths.get(U.getIgniteHome()).normalize().toString(), + ggfs.clientLogDirectory()); + } + + /** + * Instantiate new file system. + * + * @return New file system. + * @throws Exception If failed. + */ + private GridGgfsHadoopFileSystem fileSystem() throws Exception { + Configuration fsCfg = new Configuration(); + + fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml")); + + fsCfg.setBoolean("fs.ggfs.impl.disable.cache", true); + + if (logging) + fsCfg.setBoolean(String.format(PARAM_GGFS_LOG_ENABLED, "ggfs:ggfs-grid@"), logging); + + fsCfg.setStrings(String.format(PARAM_GGFS_LOG_DIR, "ggfs:ggfs-grid@"), U.getIgniteHome()); + + return (GridGgfsHadoopFileSystem)FileSystem.get(new URI("ggfs://ggfs:ggfs-grid@/"), fsCfg); + } + + /** + * Ensure that real logger is used by the file system. + * + * @return {@code True} in case path is secondary. + * @throws Exception If failed. + */ + private boolean logEnabled() throws Exception { + assert fs != null; + + Field field = fs.getClass().getDeclaredField("clientLog"); + + field.setAccessible(true); + + return ((GridGgfsLogger)field.get(fs)).isLogEnabled(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java index e2eddf8,b27ce73..f4aa1ac --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java @@@ -101,9 -100,11 +100,11 @@@ public abstract class GridHadoopAbstrac } if (restEnabled()) { - cfg.setConnectorConfiguration(new ConnectorConfiguration()); - ClientConnectionConfiguration clnCfg = new ClientConnectionConfiguration(); ++ ConnectorConfiguration clnCfg = new ConnectorConfiguration(); - cfg.getConnectorConfiguration().setPort(restPort++); - clnCfg.setRestTcpPort(restPort++); ++ clnCfg.setPort(restPort++); + - cfg.setClientConnectionConfiguration(clnCfg); ++ cfg.setConnectorConfiguration(clnCfg); } cfg.setLocalHost("127.0.0.1"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/log4j/src/test/java/org/apache/ignite/logger/log4j/GridLog4jCorrectFileNameTest.java ---------------------------------------------------------------------- diff --cc modules/log4j/src/test/java/org/apache/ignite/logger/log4j/GridLog4jCorrectFileNameTest.java index f74bfce,1db3da7..3676f4e --- a/modules/log4j/src/test/java/org/apache/ignite/logger/log4j/GridLog4jCorrectFileNameTest.java +++ b/modules/log4j/src/test/java/org/apache/ignite/logger/log4j/GridLog4jCorrectFileNameTest.java @@@ -104,8 -104,7 +104,8 @@@ public class GridLog4jCorrectFileNameTe IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setGridName(gridName); - cfg.setGridLogger(new IgniteLog4jLogger()); + cfg.setGridLogger(new Log4JLogger()); + cfg.setConnectorConfiguration(null); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestProtocol.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/spring/src/test/java/org/apache/ignite/internal/GridSpringBeanSerializationSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --cc modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 9857362,ae2048d..b8c51a1 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@@ -1518,14 -1517,12 +1517,12 @@@ object visor extends VisorTag if (cpuCnt < 4) cpuCnt = 4 - cfg.setClientConnectionConfiguration(null) + cfg.setConnectorConfiguration(null) - def createExecutor = new IgniteThreadPoolExecutor(cpuCnt, cpuCnt, Long.MaxValue, new LinkedBlockingQueue[Runnable]) - // All thread pools are overridden to have size equal to number of CPUs. - cfg.setExecutorService(createExecutor) - cfg.setSystemExecutorService(createExecutor) - cfg.setPeerClassLoadingExecutorService(createExecutor) + cfg.setPublicThreadPoolSize(cpuCnt) + cfg.setSystemThreadPoolSize(cpuCnt) + cfg.setPeerClassLoadingThreadPoolSize(cpuCnt) var ioSpi = cfg.getCommunicationSpi http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/yardstick/config/ignite-base-config.xml ----------------------------------------------------------------------