Merge remote-tracking branch 'remotes/origin/ignite-709_2' into ignite-836_2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8c105ec2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8c105ec2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8c105ec2 Branch: refs/heads/ignite-836_2 Commit: 8c105ec2379acb78bd5fc9185cc99b9a1bc18562 Parents: 9126679 16e211e Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed May 13 15:09:57 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed May 13 15:09:57 2015 +0300 ---------------------------------------------------------------------- assembly/release-base.xml | 4 +- bin/ignite-schema-import.bat | 2 +- bin/ignite-schema-import.sh | 2 +- bin/ignite.bat | 2 +- bin/ignite.sh | 2 +- bin/ignitevisorcmd.bat | 2 +- bin/ignitevisorcmd.sh | 2 +- bin/include/build-classpath.bat | 46 + bin/include/build-classpath.sh | 71 + bin/include/functions.sh | 2 +- bin/include/target-classpath.bat | 46 - bin/include/target-classpath.sh | 71 - examples/pom.xml | 2 +- modules/aop/pom.xml | 2 +- modules/aws/pom.xml | 2 +- modules/clients/pom.xml | 2 +- modules/cloud/pom.xml | 4 +- .../TcpDiscoveryCloudIpFinderSelfTest.java | 2 - modules/codegen/pom.xml | 2 +- .../ignite/codegen/MessageCodeGenerator.java | 4 +- modules/core/pom.xml | 2 +- .../internal/direct/DirectByteBufferStream.java | 4 +- .../communication/GridIoMessageFactory.java | 4 +- .../eventstorage/GridEventStorageManager.java | 5 +- .../cache/DynamicCacheDescriptor.java | 16 +- .../processors/cache/GridCacheAdapter.java | 544 +- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 18 +- .../processors/cache/GridCacheMvccManager.java | 4 +- .../GridCachePartitionExchangeManager.java | 3 + .../processors/cache/GridCacheProcessor.java | 189 +- .../processors/cache/GridCacheProxyImpl.java | 24 - .../processors/cache/GridCacheSwapManager.java | 215 +- .../processors/cache/GridCacheTtlManager.java | 42 +- .../processors/cache/GridCacheUtils.java | 5 +- .../processors/cache/IgniteInternalCache.java | 27 - ...ridCacheOptimisticCheckPreparedTxFuture.java | 434 -- ...idCacheOptimisticCheckPreparedTxRequest.java | 232 - ...dCacheOptimisticCheckPreparedTxResponse.java | 179 - .../distributed/GridCacheTxRecoveryFuture.java | 506 ++ .../distributed/GridCacheTxRecoveryRequest.java | 261 + .../GridCacheTxRecoveryResponse.java | 182 + .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtLocalPartition.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 32 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 27 + .../cache/distributed/dht/GridDhtTxMapping.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 81 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 27 +- .../colocated/GridDhtDetachedCacheEntry.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 10 - .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearLockFuture.java | 5 - .../near/GridNearOptimisticTxPrepareFuture.java | 779 ++ .../GridNearPessimisticTxPrepareFuture.java | 349 + .../cache/distributed/near/GridNearTxLocal.java | 84 +- .../near/GridNearTxPrepareFuture.java | 1050 --- .../near/GridNearTxPrepareFutureAdapter.java | 226 + .../processors/cache/local/GridLocalCache.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 27 +- .../cache/query/GridCacheQueryManager.java | 21 +- .../cache/query/GridCacheSqlQuery.java | 2 +- .../cache/query/GridCacheTwoStepQuery.java | 17 + .../cache/transactions/IgniteInternalTx.java | 9 +- .../cache/transactions/IgniteTxAdapter.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 100 +- .../transactions/IgniteTxLocalAdapter.java | 16 +- .../cache/transactions/IgniteTxManager.java | 185 +- .../datastreamer/DataStreamerImpl.java | 2 + .../processors/igfs/IgfsDataManager.java | 3 + .../processors/igfs/IgfsDeleteWorker.java | 4 + .../processors/igfs/IgfsMetaManager.java | 2 +- .../internal/processors/igfs/IgfsUtils.java | 11 +- .../offheap/GridOffHeapProcessor.java | 17 + .../processors/resource/GridResourceField.java | 11 + .../processors/resource/GridResourceIoc.java | 387 +- .../processors/resource/GridResourceMethod.java | 13 + .../resource/GridResourceProcessor.java | 4 +- .../ignite/internal/util/IgniteUtils.java | 19 +- .../internal/util/future/SettableFuture.java | 86 + .../util/lang/GridFilteredIterator.java | 2 +- .../ignite/internal/util/lang/GridFunc.java | 7218 +++++------------- .../util/offheap/GridOffHeapPartitionedMap.java | 9 + .../unsafe/GridUnsafePartitionedMap.java | 155 +- .../internal/visor/query/VisorQueryArg.java | 14 +- .../internal/visor/query/VisorQueryJob.java | 2 + .../discovery/tcp/TcpClientDiscoverySpi.java | 114 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 255 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 39 + .../messages/TcpDiscoveryClientPingRequest.java | 56 + .../TcpDiscoveryClientPingResponse.java | 67 + .../resources/META-INF/classnames.properties | 12 +- .../core/src/main/resources/ignite.properties | 2 +- .../internal/GridUpdateNotifierSelfTest.java | 21 +- .../processors/cache/CacheGetFromJobTest.java | 110 + .../GridCacheAbstractFailoverSelfTest.java | 12 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 227 +- .../cache/GridCacheAbstractSelfTest.java | 4 +- .../cache/OffHeapTieredTransactionSelfTest.java | 127 + .../GridCacheAbstractNodeRestartSelfTest.java | 101 +- .../distributed/IgniteTxGetAfterStopTest.java | 131 + ...xOriginatingNodeFailureAbstractSelfTest.java | 2 +- ...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 + ...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +- ...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 + ...earDisabledOffHeapTieredFullApiSelfTest.java | 33 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 + ...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 + ...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 + ...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 ++ ...idCacheAtomicReplicatedFailoverSelfTest.java | 6 + ...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 + ...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 + ...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...achePartitionedMultiNodeFullApiSelfTest.java | 15 +- .../GridCachePartitionedNodeRestartTest.java | 4 +- ...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +- ...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...PartitionedOffHeapTieredFullApiSelfTest.java | 32 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 + ...ePartitionedOptimisticTxNodeRestartTest.java | 4 +- .../GridCachePartitionedTxSalvageSelfTest.java | 25 +- .../GridCacheReplicatedFailoverSelfTest.java | 6 + .../GridCacheReplicatedNodeRestartSelfTest.java | 82 + ...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +- ...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + .../IgniteCacheExpiryPolicyAbstractTest.java | 2 +- .../IgniteCacheExpiryPolicyTestSuite.java | 2 + .../expiry/IgniteCacheTtlCleanupSelfTest.java | 85 + ...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 + .../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +- ...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 + .../igfs/IgfsClientCacheSelfTest.java | 132 + .../processors/igfs/IgfsOneClientNodeTest.java | 133 + .../processors/igfs/IgfsStreamsSelfTest.java | 2 +- .../tcp/TcpClientDiscoverySelfTest.java | 1021 --- .../TcpClientDiscoverySpiConfigSelfTest.java | 2 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 1089 +++ .../ignite/testsuites/IgniteBasicTestSuite.java | 1 + .../IgniteCacheFailoverTestSuite.java | 10 +- .../IgniteCacheFullApiSelfTestSuite.java | 18 + .../testsuites/IgniteCacheRestartTestSuite.java | 11 +- .../testsuites/IgniteCacheTestSuite3.java | 5 +- .../IgniteCacheTxRecoverySelfTestSuite.java | 4 + .../ignite/testsuites/IgniteIgfsTestSuite.java | 3 + .../IgniteSpiDiscoverySelfTestSuite.java | 2 +- modules/extdata/p2p/pom.xml | 2 +- modules/extdata/uri/pom.xml | 2 +- modules/gce/pom.xml | 4 +- modules/geospatial/pom.xml | 2 +- modules/hadoop/pom.xml | 2 +- modules/hibernate/pom.xml | 2 +- modules/indexing/pom.xml | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 4 + .../processors/query/h2/sql/GridSqlQuery.java | 20 + .../query/h2/sql/GridSqlQueryParser.java | 10 +- .../query/h2/sql/GridSqlQuerySplitter.java | 11 +- .../processors/query/h2/sql/GridSqlSelect.java | 2 +- .../processors/query/h2/sql/GridSqlUnion.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 3 + .../h2/twostep/GridReduceQueryExecutor.java | 119 +- .../cache/GridCacheOffheapIndexGetSelfTest.java | 111 + .../IgniteCacheAbstractFieldsQuerySelfTest.java | 21 + ...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 + .../IgniteCacheQueryMultiThreadedSelfTest.java | 29 +- .../IgniteCacheQuerySelfTestSuite.java | 1 + .../IgniteCacheWithIndexingTestSuite.java | 2 + modules/jcl/pom.xml | 2 +- modules/jta/pom.xml | 2 +- modules/log4j/pom.xml | 2 +- modules/rest-http/pom.xml | 2 +- modules/scalar/pom.xml | 2 +- .../ignite/scalar/ScalarConversions.scala | 8 - modules/schedule/pom.xml | 2 +- modules/schema-import/pom.xml | 2 +- .../ignite/schema/generator/CodeGenerator.java | 41 +- modules/slf4j/pom.xml | 2 +- modules/spring/pom.xml | 2 +- modules/ssh/pom.xml | 2 +- modules/tools/pom.xml | 2 +- modules/urideploy/pom.xml | 2 +- modules/visor-console/pom.xml | 2 +- .../commands/cache/VisorCacheScanCommand.scala | 2 +- modules/visor-plugins/pom.xml | 2 +- modules/web/pom.xml | 2 +- modules/yardstick/pom.xml | 2 +- parent/pom.xml | 2 + pom.xml | 115 +- 194 files changed, 9825 insertions(+), 10127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index ad78f86,53522c7..134097b --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@@ -4476,74 -4517,76 +4514,110 @@@ public class TcpDiscoverySpi extends Tc /** * @param msg Message. */ + private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) { + utilityPool.execute(new Runnable() { + @Override public void run() { + boolean res = pingNode(msg.nodeToPing()); + + final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId()); + + if (worker == null) { + if (log.isDebugEnabled()) + log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId()); + } + else { + TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse( + getLocalNodeId(), msg.nodeToPing(), res); + + pingRes.verify(getLocalNodeId()); + + worker.addMessage(pingRes); + } + } + }); + } + + /** + * @param msg Message. + */ + private void processPingResponse(final TcpDiscoveryPingResponse msg) { + ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId()); + + if (clientWorker != null) + clientWorker.pingResult(true); + } + + /** + * @param msg Message. + */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (isLocalNodeCoordinator()) { - if (msg.verified()) { + boolean sndNext; + + if (!msg.verified()) { + msg.verify(getLocalNodeId()); + msg.topologyVersion(ring.topologyVersion()); + + notifyDiscoveryListener(msg); + + sndNext = true; + } + else + sndNext = false; + + if (sndNext && ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + else { stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); + try { + DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); - return; + DiscoverySpiCustomMessage nextMsg = msgObj.newMessageOnRingEnd(); + + if (nextMsg != null) + addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg))); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal discovery custom message.", e); + } + + addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); } + } + else { + if (msg.verified()) + notifyDiscoveryListener(msg); - msg.verify(getLocalNodeId()); - msg.topologyVersion(ring.topologyVersion()); + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); } + } - if (msg.verified()) { - DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; + /** + * @param msg Custom message. + */ + private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) { + DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; - TcpDiscoverySpiState spiState = spiStateCopy(); + TcpDiscoverySpiState spiState = spiStateCopy(); - Map<Long, Collection<ClusterNode>> hist; + Map<Long, Collection<ClusterNode>> hist; - synchronized (mux) { - hist = new TreeMap<>(topHist); - } + synchronized (mux) { + hist = new TreeMap<>(topHist); + } - Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion()); + Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion()); - if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { - assert msg.messageBytes() != null; + if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { + assert msg.messageBytes() != null; - TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); + TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); + if (node != null) { try { - Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 0000000,507b3e7..da40d4e mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@@ -1,0 -1,1089 +1,1089 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.discovery.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.io.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.spi.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.spi.discovery.tcp.messages.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; + import static org.apache.ignite.events.EventType.*; + + /** + * Client-based discovery tests. + */ + public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final AtomicInteger srvIdx = new AtomicInteger(); + + /** */ + private static final AtomicInteger clientIdx = new AtomicInteger(); + + /** */ + private static Collection<UUID> srvNodeIds; + + /** */ + private static Collection<UUID> clientNodeIds; + + /** */ + private static int clientsPerSrv; + + /** */ + private static CountDownLatch srvJoinedLatch; + + /** */ + private static CountDownLatch srvLeftLatch; + + /** */ + private static CountDownLatch srvFailedLatch; + + /** */ + private static CountDownLatch clientJoinedLatch; + + /** */ + private static CountDownLatch clientLeftLatch; + + /** */ + private static CountDownLatch clientFailedLatch; + + /** */ + private static CountDownLatch msgLatch; + + /** */ + private UUID nodeId; + + /** */ + private TcpDiscoveryVmIpFinder clientIpFinder; + + /** */ + private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("127.0.0.1"); + + if (gridName.startsWith("server")) { + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + else if (gridName.startsWith("client")) { + TcpClientDiscoverySpi disco = new TestTcpClientDiscovery(); + + disco.setJoinTimeout(joinTimeout); + + TcpDiscoveryVmIpFinder ipFinder; + + if (clientIpFinder != null) + ipFinder = clientIpFinder; + else { + ipFinder = new TcpDiscoveryVmIpFinder(); + + String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()). + get((clientIdx.get() - 1) / clientsPerSrv).toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Arrays.asList(addr)); + } + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + String nodeId = cfg.getNodeId().toString(); + + nodeId = "cc" + nodeId.substring(2); + + cfg.setNodeId(UUID.fromString(nodeId)); + } + + if (nodeId != null) + cfg.setNodeId(nodeId); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses(); + + if (!F.isEmpty(addrs)) + IP_FINDER.unregisterAddresses(addrs); + + srvIdx.set(0); + clientIdx.set(0); + + srvNodeIds = new GridConcurrentHashSet<>(); + clientNodeIds = new GridConcurrentHashSet<>(); + + clientsPerSrv = 2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllClients(true); + stopAllServers(true); + + nodeId = null; + clientIpFinder = null; + joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT; + + assert G.allGrids().isEmpty(); + } + + /** + * + * @throws Exception + */ + public void testJoinTimeout() throws Exception { + clientIpFinder = new TcpDiscoveryVmIpFinder(); + joinTimeout = 1000; + + try { + startClientNodes(1); + + fail("Client cannot be start because no server nodes run"); + } + catch (IgniteCheckedException e) { + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + + assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startClientNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(3, 4); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(3); + clientLeftLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + stopGrid("client-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(3); + clientFailedLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + failClient(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startServerNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(4, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(2); + clientLeftLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty(); + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPing() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + + assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id()); + assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id()); + } + + /** + * @throws Exception If failed. + */ + public void testPingFailedNodeFromClient() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + final CountDownLatch latch = new CountDownLatch(1); + + ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { + @Override public void apply(Socket sock) { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id()); + assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id()); + + latch.countDown(); + } + + /** + * @throws Exception If failed. + */ + public void testPingFailedClientNode() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + + ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite(); + + assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + + ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll(); + + assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOnRouterFail() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + setClientRouter(2, 0); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(2, 3); + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOnNetworkProblem() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + setClientRouter(2, 0); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(2, 3); + + ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection(); + + G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message. + + checkNodes(3, 3); + } + + /** + * @throws Exception If failed. + */ + public void testGetMissedMessagesOnReconnect() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(2); + + checkNodes(3, 2); + + clientLeftLatch = new CountDownLatch(1); + srvLeftLatch = new CountDownLatch(2); + + attachListeners(2, 2); + + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(srvLeftLatch); + + Thread.sleep(500); + + assert G.ignite("client-0").cluster().nodes().size() == 4; + assert G.ignite("client-1").cluster().nodes().size() == 5; + + clientLeftLatch = new CountDownLatch(1); + + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll(); + + await(clientLeftLatch); + + checkNodes(2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testClientSegmentation() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + // setClientRouter(2, 2); + + srvFailedLatch = new CountDownLatch(2 + 2); + clientFailedLatch = new CountDownLatch(2 + 2); + + attachListeners(2, 2); + + final CountDownLatch client2StoppedLatch = new CountDownLatch(1); + + IgnitionListener lsnr = new IgnitionListener() { + @Override public void onStateChange(@Nullable String name, IgniteState state) { + if (state == IgniteState.STOPPED_ON_SEGMENTATION) + client2StoppedLatch.countDown(); + } + }; + G.addListener(lsnr); + + try { + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + await(client2StoppedLatch); + + checkNodes(2, 2); + } + finally { + G.removeListener(lsnr); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoinOneServer() throws Exception { + startServerNodes(1); + + srvJoinedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + startClientNodes(1); + + await(srvJoinedLatch); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeaveOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvLeftLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + stopGrid("client-0"); + + await(srvLeftLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFailOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + failClient(0); + + await(srvFailedLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + attachListeners(3, 3); + + assertTrue(checkMetrics(3, 3, 0)); + + G.ignite("client-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 1); + } + }, 10000)); + + checkMetrics(3, 3, 1); + + G.ignite("server-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 2); + } + }, 10000)); + } + + /** + * @param srvCnt Number of Number of server nodes. + * @param clientCnt Number of client nodes. + * @param execJobsCnt Expected number of executed jobs. + * @return Whether metrics are correct. + */ + private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != execJobsCnt) + return false; + } + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != execJobsCnt) + return false; + } + } + + return true; + } + + /** + * @throws Exception If failed. + */ + public void testDataExchangeFromServer() throws Exception { + testDataExchange("server-0"); + } + + /** + * TODO: IGNITE-587. + * + * @throws Exception If failed. + */ + public void testDataExchangeFromClient() throws Exception { + testDataExchange("client-0"); + } + + /** + * @throws Exception If failed. + */ + private void testDataExchange(String masterName) throws Exception { + startServerNodes(2); + startClientNodes(2); + + checkNodes(2, 2); + + IgniteMessaging msg = grid(masterName).message(); + + UUID id = null; + + try { + id = msg.remoteListen(null, new MessageListener()); + - msgLatch = new CountDownLatch(4); ++ msgLatch = new CountDownLatch(2); + + msg.send(null, "Message 1"); + + await(msgLatch); + + startServerNodes(1); + startClientNodes(1); + + checkNodes(3, 3); + - msgLatch = new CountDownLatch(6); ++ msgLatch = new CountDownLatch(3); + + msg.send(null, "Message 2"); + + await(msgLatch); + } + finally { + if (id != null) + msg.stopRemoteListen(id); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testDuplicateId() throws Exception { + startServerNodes(2); + + nodeId = G.ignite("server-1").cluster().localNode().id(); + + try { + startGrid("client-0"); + + assert false; + } + catch (IgniteCheckedException e) { + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + assert spiEx.getMessage().contains("same ID") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testTimeoutWaitingNodeAddedMessage() throws Exception { + startServerNodes(2); + + final CountDownLatch cnt = new CountDownLatch(1); + + ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener( + new IgniteInClosure<TcpDiscoveryAbstractMessage>() { + @Override public void apply(TcpDiscoveryAbstractMessage msg) { + try { + cnt.await(10, MINUTES); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + }); + + try { + startGrid("client-0"); + + assert false; + } + catch (IgniteCheckedException e) { + cnt.countDown(); + + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testGridStartTime() throws Exception { + startServerNodes(2); + + startClientNodes(2); + + long startTime = -1; + + for (Ignite g : G.allGrids()) { + IgniteEx kernal = (IgniteKernal)g; + + assertTrue(kernal.context().discovery().gridStartTime() > 0); + + if (startTime == -1) + startTime = kernal.context().discovery().gridStartTime(); + else + assertEquals(startTime, kernal.context().discovery().gridStartTime()); + } + } + + /** + * @param clientIdx Index. + * @throws Exception In case of error. + */ + private void setClientRouter(int clientIdx, int srvIdx) throws Exception { + TcpClientDiscoverySpi disco = + (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); + + String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Arrays.asList(addr)); + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startServerNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startClientNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param idx Index. + */ + private void failServer(int idx) { + ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param idx Index. + */ + private void failClient(int idx) { + ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void attachListeners(int srvCnt, int clientCnt) throws Exception { + if (srvJoinedLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Joined event fired on server: " + evt); + + srvJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (srvLeftLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Left event fired on server: " + evt); + + srvLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (srvFailedLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Failed event fired on server: " + evt); + + srvFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + + if (clientJoinedLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Joined event fired on client: " + evt); + + clientJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (clientLeftLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Left event fired on client: " + evt); + + clientLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (clientFailedLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Failed event fired on client: " + evt); + + clientFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void checkNodes(int srvCnt, int clientCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + assertTrue(srvNodeIds.contains(g.cluster().localNode().id())); + + assertFalse(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); + + assertTrue(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + } + + /** + * @param ignite Grid. + * @param expCnt Expected nodes count. + */ + @SuppressWarnings("TypeMayBeWeakened") + private void checkRemoteNodes(Ignite ignite, int expCnt) { + Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); + + assertEquals(expCnt, nodes.size()); + + for (ClusterNode node : nodes) { + UUID id = node.id(); + + if (clientNodeIds.contains(id)) + assertTrue(node.isClient()); + else if (srvNodeIds.contains(id)) + assertFalse(node.isClient()); + else + assert false : "Unexpected node ID: " + id; + } + } + + /** + * @param latch Latch. + * @throws InterruptedException If interrupted. + */ + private void await(CountDownLatch latch) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); + } + + /** + */ + private static class MessageListener implements IgniteBiPredicate<UUID, Object> { + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); + + msgLatch.countDown(); + + return true; + } + } + + /** + * + */ + private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi { + /** */ + private final Object mux = new Object(); + + /** */ + private final AtomicBoolean writeLock = new AtomicBoolean(); + + /** */ + private final AtomicBoolean openSockLock = new AtomicBoolean(); + + /** + * @param lock Lock. + */ + private void waitFor(AtomicBoolean lock) { + try { + synchronized (mux) { + while (lock.get()) + mux.wait(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new RuntimeException(e); + } + } + + /** + * @param isPause Is lock. + * @param locks Locks. + */ + private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) { + synchronized (mux) { + for (AtomicBoolean lock : locks) + lock.set(isPause); + + mux.notifyAll(); + } + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { + waitFor(writeLock); + + super.writeToSocket(sock, msg, bout); + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + waitFor(openSockLock); + + return super.openSocket(sockAddr); + } + + /** + * + */ + public void pauseSocketWrite() { + pauseResumeOperation(true, writeLock); + } + + /** + * + */ + public void pauseAll() { + pauseResumeOperation(true, openSockLock, writeLock); + + brokeConnection(); + } + + /** + * + */ + public void resumeAll() { + pauseResumeOperation(false, openSockLock, writeLock); + } + } + }