# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/537f631a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/537f631a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/537f631a Branch: refs/heads/ignite-63 Commit: 537f631a7238264d82cf58fbaec68b78aadd6895 Parents: 2000f44 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jan 23 12:27:09 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jan 23 12:27:10 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/ClusterMetricsSelfTest.java | 353 ++++++++ .../internal/ClusterNodeMetricsSelfTest.java | 246 ++++++ .../ignite/internal/GridAffinityMappedTest.java | 164 ++++ .../internal/GridAffinityP2PSelfTest.java | 205 +++++ .../ignite/internal/GridAffinitySelfTest.java | 118 +++ .../GridAlwaysFailoverSpiFailSelfTest.java | 159 ++++ .../internal/GridCacheProjectionRemoveTest.java | 41 + .../internal/GridCancelOnGridStopSelfTest.java | 107 +++ .../internal/GridCancelUnusedJobSelfTest.java | 220 +++++ .../GridCancelledJobsMetricsSelfTest.java | 214 +++++ .../GridCollisionJobsContextSelfTest.java | 112 +++ .../internal/GridCommunicationSelfTest.java | 121 +++ .../GridContinuousJobAnnotationSelfTest.java | 215 +++++ .../GridContinuousJobSiblingsSelfTest.java | 145 ++++ .../internal/GridContinuousTaskSelfTest.java | 348 ++++++++ .../GridDeploymentMultiThreadedSelfTest.java | 121 +++ .../ignite/internal/GridDeploymentSelfTest.java | 535 +++++++++++++ .../internal/GridDiscoveryEventSelfTest.java | 421 ++++++++++ .../ignite/internal/GridDiscoverySelfTest.java | 419 ++++++++++ .../GridEventStorageCheckAllEventsSelfTest.java | 433 ++++++++++ ...ventStorageRuntimeConfigurationSelfTest.java | 347 ++++++++ .../internal/GridEventStorageSelfTest.java | 268 +++++++ .../internal/GridExecutorServiceTest.java | 315 ++++++++ .../GridExplicitImplicitDeploymentSelfTest.java | 476 +++++++++++ .../internal/GridFactoryVmShutdownTest.java | 101 +++ .../GridFailedInputParametersSelfTest.java | 154 ++++ .../GridFailoverCustomTopologySelfTest.java | 188 +++++ .../ignite/internal/GridFailoverSelfTest.java | 151 ++++ .../GridFailoverTaskWithPredicateSelfTest.java | 252 ++++++ .../internal/GridFailoverTopologySelfTest.java | 160 ++++ .../ignite/internal/GridHomePathSelfTest.java | 75 ++ .../GridJobCheckpointCleanupSelfTest.java | 164 ++++ .../GridJobCollisionCancelSelfTest.java | 276 +++++++ .../ignite/internal/GridJobContextSelfTest.java | 121 +++ .../GridJobMasterLeaveAwareSelfTest.java | 802 +++++++++++++++++++ .../internal/GridJobStealingSelfTest.java | 439 ++++++++++ .../GridJobStealingZeroActiveJobsSelfTest.java | 169 ++++ .../internal/GridJobSubjectIdSelfTest.java | 153 ++++ .../GridKernalConcurrentAccessStopSelfTest.java | 63 ++ .../ignite/internal/GridKernalTestUtils.java | 45 ++ .../internal/GridLifecycleAwareSelfTest.java | 196 +++++ .../internal/GridLifecycleBeanSelfTest.java | 312 ++++++++ .../internal/GridListenActorSelfTest.java | 233 ++++++ .../GridLocalEventListenerSelfTest.java | 73 ++ .../internal/GridManagementJobSelfTest.java | 167 ++++ .../internal/GridMultipleJobsSelfTest.java | 231 ++++++ .../internal/GridMultipleSpisSelfTest.java | 302 +++++++ .../GridMultipleVersionsDeploymentSelfTest.java | 306 +++++++ .../GridMultithreadedJobStealingSelfTest.java | 240 ++++++ .../ignite/internal/GridNodeFilterSelfTest.java | 78 ++ .../ignite/internal/GridNodeLocalSelfTest.java | 65 ++ .../GridNodeVisorAttributesSelfTest.java | 114 +++ .../internal/GridNonHistoryMetricsSelfTest.java | 124 +++ .../internal/GridProjectionAbstractTest.java | 768 ++++++++++++++++++ .../GridProjectionForCachesSelfTest.java | 256 ++++++ ...ectionLocalJobMultipleArgumentsSelfTest.java | 156 ++++ .../ignite/internal/GridProjectionSelfTest.java | 145 ++++ .../ignite/internal/GridReduceSelfTest.java | 194 +++++ .../internal/GridReleaseTypeSelfTest.java | 135 ++++ .../internal/GridRuntimeExceptionSelfTest.java | 302 +++++++ .../internal/GridSameVmStartupSelfTest.java | 107 +++ .../apache/ignite/internal/GridSelfTest.java | 166 ++++ .../internal/GridSpiExceptionSelfTest.java | 174 ++++ .../ignite/internal/GridStartStopSelfTest.java | 182 +++++ .../apache/ignite/internal/GridStartupMain.java | 57 ++ .../apache/ignite/internal/GridStartupTest.java | 69 ++ .../internal/GridStopWithCancelSelfTest.java | 122 +++ .../internal/GridStopWithWaitSelfTest.java | 263 ++++++ .../GridTaskCancelSingleNodeSelfTest.java | 185 +++++ .../GridTaskContinuousMapperSelfTest.java | 331 ++++++++ .../GridTaskExecutionContextSelfTest.java | 178 ++++ .../internal/GridTaskExecutionSelfTest.java | 68 ++ .../internal/GridTaskFailoverSelfTest.java | 117 +++ .../GridTaskFutureImplStopGridSelfTest.java | 213 +++++ .../GridTaskInstanceExecutionSelfTest.java | 121 +++ .../internal/GridTaskInstantiationSelfTest.java | 117 +++ .../internal/GridTaskJobRejectSelfTest.java | 160 ++++ .../internal/GridTaskListenerSelfTest.java | 106 +++ .../internal/GridTaskMapAsyncSelfTest.java | 141 ++++ .../GridTaskNameAnnotationSelfTest.java | 124 +++ .../internal/GridTaskResultCacheSelfTest.java | 129 +++ .../internal/GridTaskTimeoutSelfTest.java | 223 ++++++ .../GridTopicExternalizableSelfTest.java | 166 ++++ .../GridTopologyBuildVersionSelfTest.java | 102 +++ .../ignite/internal/GridVersionSelfTest.java | 69 ++ .../org/apache/ignite/internal/package.html | 24 + .../apache/ignite/internal/updatestatus.html | 28 + .../marshaller/GridMarshallerAbstractTest.java | 1 - .../GridContinuousTaskSelfTestSuite.java | 2 +- .../GridExternalizableSelfTestSuite.java | 2 +- .../testsuites/GridKernalSelfTestSuite.java | 1 - .../testsuites/GridRichSelfTestSuite.java | 2 +- .../testsuites/bamboo/GridBasicTestSuite.java | 2 +- .../bamboo/GridComputeGridTestSuite.java | 2 +- .../grid/kernal/ClusterMetricsSelfTest.java | 353 -------- .../grid/kernal/ClusterNodeMetricsSelfTest.java | 246 ------ .../grid/kernal/GridAffinityMappedTest.java | 164 ---- .../grid/kernal/GridAffinityP2PSelfTest.java | 205 ----- .../grid/kernal/GridAffinitySelfTest.java | 118 --- .../GridAlwaysFailoverSpiFailSelfTest.java | 159 ---- .../kernal/GridCacheProjectionRemoveTest.java | 41 - .../kernal/GridCancelOnGridStopSelfTest.java | 107 --- .../kernal/GridCancelUnusedJobSelfTest.java | 220 ----- .../GridCancelledJobsMetricsSelfTest.java | 214 ----- .../GridCollisionJobsContextSelfTest.java | 112 --- .../grid/kernal/GridCommunicationSelfTest.java | 121 --- .../GridContinuousJobAnnotationSelfTest.java | 215 ----- .../GridContinuousJobSiblingsSelfTest.java | 145 ---- .../grid/kernal/GridContinuousTaskSelfTest.java | 348 -------- .../GridDeploymentMultiThreadedSelfTest.java | 121 --- .../grid/kernal/GridDeploymentSelfTest.java | 535 ------------- .../grid/kernal/GridDiscoveryEventSelfTest.java | 421 ---------- .../grid/kernal/GridDiscoverySelfTest.java | 419 ---------- .../GridEventStorageCheckAllEventsSelfTest.java | 433 ---------- ...ventStorageRuntimeConfigurationSelfTest.java | 347 -------- .../grid/kernal/GridEventStorageSelfTest.java | 268 ------- .../grid/kernal/GridExecutorServiceTest.java | 315 -------- .../GridExplicitImplicitDeploymentSelfTest.java | 476 ----------- .../grid/kernal/GridFactoryVmShutdownTest.java | 101 --- .../GridFailedInputParametersSelfTest.java | 154 ---- .../GridFailoverCustomTopologySelfTest.java | 188 ----- .../grid/kernal/GridFailoverSelfTest.java | 151 ---- .../GridFailoverTaskWithPredicateSelfTest.java | 252 ------ .../kernal/GridFailoverTopologySelfTest.java | 160 ---- .../grid/kernal/GridHomePathSelfTest.java | 75 -- .../GridJobCheckpointCleanupSelfTest.java | 164 ---- .../kernal/GridJobCollisionCancelSelfTest.java | 276 ------- .../grid/kernal/GridJobContextSelfTest.java | 121 --- .../kernal/GridJobMasterLeaveAwareSelfTest.java | 802 ------------------- .../grid/kernal/GridJobStealingSelfTest.java | 439 ---------- .../GridJobStealingZeroActiveJobsSelfTest.java | 169 ---- .../grid/kernal/GridJobSubjectIdSelfTest.java | 153 ---- .../GridKernalConcurrentAccessStopSelfTest.java | 63 -- .../grid/kernal/GridKernalTestUtils.java | 45 -- .../grid/kernal/GridLifecycleAwareSelfTest.java | 196 ----- .../grid/kernal/GridLifecycleBeanSelfTest.java | 312 -------- .../grid/kernal/GridListenActorSelfTest.java | 233 ------ .../kernal/GridLocalEventListenerSelfTest.java | 73 -- .../grid/kernal/GridManagementJobSelfTest.java | 167 ---- .../grid/kernal/GridMultipleJobsSelfTest.java | 231 ------ .../grid/kernal/GridMultipleSpisSelfTest.java | 302 ------- .../GridMultipleVersionsDeploymentSelfTest.java | 306 ------- .../GridMultithreadedJobStealingSelfTest.java | 240 ------ .../grid/kernal/GridNodeFilterSelfTest.java | 78 -- .../grid/kernal/GridNodeLocalSelfTest.java | 65 -- .../kernal/GridNodeVisorAttributesSelfTest.java | 114 --- .../kernal/GridNonHistoryMetricsSelfTest.java | 124 --- .../grid/kernal/GridProjectionAbstractTest.java | 768 ------------------ .../kernal/GridProjectionForCachesSelfTest.java | 256 ------ ...ectionLocalJobMultipleArgumentsSelfTest.java | 156 ---- .../grid/kernal/GridProjectionSelfTest.java | 145 ---- .../grid/kernal/GridReduceSelfTest.java | 194 ----- .../grid/kernal/GridReleaseTypeSelfTest.java | 135 ---- .../kernal/GridRuntimeExceptionSelfTest.java | 302 ------- .../grid/kernal/GridSameVmStartupSelfTest.java | 107 --- .../org/gridgain/grid/kernal/GridSelfTest.java | 166 ---- .../grid/kernal/GridSpiExceptionSelfTest.java | 174 ---- .../grid/kernal/GridStartStopSelfTest.java | 182 ----- .../gridgain/grid/kernal/GridStartupMain.java | 57 -- .../gridgain/grid/kernal/GridStartupTest.java | 69 -- .../grid/kernal/GridStopWithCancelSelfTest.java | 122 --- .../grid/kernal/GridStopWithWaitSelfTest.java | 263 ------ .../GridTaskCancelSingleNodeSelfTest.java | 185 ----- .../GridTaskContinuousMapperSelfTest.java | 331 -------- .../GridTaskExecutionContextSelfTest.java | 178 ---- .../grid/kernal/GridTaskExecutionSelfTest.java | 68 -- .../grid/kernal/GridTaskFailoverSelfTest.java | 117 --- .../GridTaskFutureImplStopGridSelfTest.java | 213 ----- .../GridTaskInstanceExecutionSelfTest.java | 121 --- .../kernal/GridTaskInstantiationSelfTest.java | 117 --- .../grid/kernal/GridTaskJobRejectSelfTest.java | 160 ---- .../grid/kernal/GridTaskListenerSelfTest.java | 106 --- .../grid/kernal/GridTaskMapAsyncSelfTest.java | 141 ---- .../kernal/GridTaskNameAnnotationSelfTest.java | 124 --- .../kernal/GridTaskResultCacheSelfTest.java | 129 --- .../grid/kernal/GridTaskTimeoutSelfTest.java | 223 ------ .../kernal/GridTopicExternalizableSelfTest.java | 166 ---- .../GridTopologyBuildVersionSelfTest.java | 102 --- .../grid/kernal/GridVersionSelfTest.java | 69 -- .../java/org/gridgain/grid/kernal/package.html | 24 - .../org/gridgain/grid/kernal/updatestatus.html | 28 - .../ignite/internal/GridFactorySelfTest.java | 2 +- 182 files changed, 17561 insertions(+), 17563 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/ClusterMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterMetricsSelfTest.java new file mode 100644 index 0000000..8247802 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterMetricsSelfTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Tests for projection metrics. + */ +@GridCommonTest(group = "Kernal Self") +public class ClusterMetricsSelfTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_CNT = 4; + + /** */ + private static final int ITER_CNT = 30; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < NODES_CNT; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(); + cfg.setIncludeProperties(); + cfg.setMetricsUpdateFrequency(0); + + return cfg; + } + + /** + * @throws Exception In case of error. + */ + public void testEmptyProjection() throws Exception { + try { + grid(0).forPredicate(F.<ClusterNode>alwaysFalse()).metrics(); + + assert false; + } + catch (ClusterGroupEmptyException e) { + info("Caught expected exception: " + e); + } + } + + /** + * + */ + public void testTaskExecution() { + for (int i = 0; i < ITER_CNT; i++) { + info("Starting new iteration: " + i); + + try { + performTaskExecutionTest(); + } + catch (Throwable t) { + error("Iteration failed: " + i, t); + + fail("Test failed (see logs for details)."); + } + } + } + + /** + * @throws Exception In case of error. + */ + private void performTaskExecutionTest() throws Exception { + Ignite g = grid(0); + + JobFinishLock jobFinishLock = new JobFinishLock(); + + MetricsUpdateLock metricsUpdLock = new MetricsUpdateLock(); + + try { + for (Ignite g0 : G.allGrids()) + g0.events().localListen(jobFinishLock, EVT_JOB_FINISHED); + + g.compute().execute(new GridTestTask(), "testArg"); + + // Wait until all nodes fire JOB FINISH event. + jobFinishLock.await(); + + g.events().localListen(metricsUpdLock, EVT_NODE_METRICS_UPDATED); + + // Wait until local node will have updated metrics. + metricsUpdLock.await(); + + ClusterMetrics m = g.cluster().metrics(); + + checkMetrics(m); + } + finally { + for (Ignite g0 : G.allGrids()) + g0.events().stopLocalListen(jobFinishLock); + + g.events().stopLocalListen(metricsUpdLock); + } + } + + /** + * @param m Metrics. + */ + @SuppressWarnings({"FloatingPointEquality"}) + private void checkMetrics(ClusterMetrics m) { + assert m.getTotalNodes() == NODES_CNT; + assert m.getTotalHosts() == 1; + + assert m.getMinimumActiveJobs() == 0; + assert m.getMaximumActiveJobs() == 0; + assert m.getAverageActiveJobs() == 0; + + assert m.getMinimumCancelledJobs() == 0; + assert m.getMaximumCancelledJobs() == 0; + assert m.getAverageCancelledJobs() == 0; + + assert m.getMinimumRejectedJobs() == 0; + assert m.getMaximumRejectedJobs() == 0; + assert m.getAverageRejectedJobs() == 0; + + assert m.getMinimumWaitingJobs() == 0; + assert m.getMaximumWaitingJobs() == 0; + assert m.getAverageWaitingJobs() == 0; + + assert m.getMinimumJobExecuteTime() >= 0; + assert m.getMaximumJobExecuteTime() >= 0; + assert m.getAverageJobExecuteTime() >= 0; + + assert m.getAverageJobExecuteTime() >= m.getMinimumJobExecuteTime() && + m.getAverageJobExecuteTime() <= m.getMaximumJobExecuteTime(); + + assert m.getMinimumJobWaitTime() >= 0; + assert m.getMaximumJobWaitTime() >= 0; + assert m.getAverageJobWaitTime() >= 0; + + assert m.getAverageJobWaitTime() >= m.getMinimumJobWaitTime() && + m.getAverageJobWaitTime() <= m.getMaximumJobWaitTime(); + + assert m.getMinimumDaemonThreadCount() > 0; + assert m.getMaximumDaemonThreadCount() > 0; + assert m.getAverageDaemonThreadCount() > 0; + + assert m.getAverageDaemonThreadCount() >= m.getMinimumDaemonThreadCount() && + m.getAverageDaemonThreadCount() <= m.getMaximumDaemonThreadCount(); + + assert m.getMinimumThreadCount() > 0; + assert m.getMaximumThreadCount() > 0; + assert m.getAverageThreadCount() > 0; + + assert m.getAverageThreadCount() >= m.getMinimumThreadCount() && + m.getAverageThreadCount() <= m.getMaximumThreadCount(); + + assert m.getMinimumIdleTime() >= 0; + assert m.getMaximumIdleTime() >= 0; + assert m.getAverageIdleTime() >= 0; + assert m.getIdleTimePercentage() >= 0; + assert m.getIdleTimePercentage() <= 1; + + assert m.getAverageIdleTime() >= m.getMinimumIdleTime() && m.getAverageIdleTime() <= m.getMaximumIdleTime(); + + assert m.getMinimumBusyTimePercentage() > 0; + assert m.getMaximumBusyTimePercentage() > 0; + assert m.getAverageBusyTimePercentage() > 0; + + assert m.getAverageBusyTimePercentage() >= m.getMinimumBusyTimePercentage() && + m.getAverageBusyTimePercentage() <= m.getMaximumBusyTimePercentage(); + + assert m.getMinimumCpuLoad() >= 0 || m.getMinimumCpuLoad() == -1.0; + assert m.getMaximumCpuLoad() >= 0 || m.getMaximumCpuLoad() == -1.0; + assert m.getAverageCpuLoad() >= 0 || m.getAverageCpuLoad() == -1.0; + + assert m.getAverageCpuLoad() >= m.getMinimumCpuLoad() && m.getAverageCpuLoad() <= m.getMaximumCpuLoad(); + + assert m.getMinimumHeapMemoryCommitted() > 0; + assert m.getMaximumHeapMemoryCommitted() > 0; + assert m.getAverageHeapMemoryCommitted() > 0; + + assert m.getAverageHeapMemoryCommitted() >= m.getMinimumHeapMemoryCommitted() && + m.getAverageHeapMemoryCommitted() <= m.getMaximumHeapMemoryCommitted(); + + assert m.getMinimumHeapMemoryUsed() > 0; + assert m.getMaximumHeapMemoryUsed() > 0; + assert m.getAverageHeapMemoryUsed() > 0; + + assert m.getAverageHeapMemoryUsed() >= m.getMinimumHeapMemoryUsed() && + m.getAverageHeapMemoryUsed() <= m.getMaximumHeapMemoryUsed(); + + assert m.getMinimumHeapMemoryMaximum() > 0; + assert m.getMaximumHeapMemoryMaximum() > 0; + assert m.getAverageHeapMemoryMaximum() > 0; + + assert m.getAverageHeapMemoryMaximum() >= m.getMinimumHeapMemoryMaximum() && + m.getAverageHeapMemoryMaximum() <= m.getMaximumHeapMemoryMaximum(); + + assert m.getMinimumHeapMemoryInitialized() >= 0; + assert m.getMaximumHeapMemoryInitialized() >= 0; + assert m.getAverageHeapMemoryInitialized() >= 0; + + assert m.getAverageHeapMemoryInitialized() >= m.getMinimumHeapMemoryInitialized() && + m.getAverageHeapMemoryInitialized() <= m.getMaximumHeapMemoryInitialized(); + + assert m.getMinimumNonHeapMemoryCommitted() > 0; + assert m.getMaximumNonHeapMemoryCommitted() > 0; + assert m.getAverageNonHeapMemoryCommitted() > 0; + + assert m.getAverageNonHeapMemoryCommitted() >= m.getMinimumNonHeapMemoryCommitted() && + m.getAverageNonHeapMemoryCommitted() <= m.getMaximumNonHeapMemoryCommitted(); + + assert m.getMinimumNonHeapMemoryUsed() > 0; + assert m.getMaximumNonHeapMemoryUsed() > 0; + assert m.getAverageNonHeapMemoryUsed() > 0; + + assert m.getAverageNonHeapMemoryUsed() >= m.getMinimumNonHeapMemoryUsed() && + m.getAverageNonHeapMemoryUsed() <= m.getMaximumNonHeapMemoryUsed(); + + assert m.getMinimumNonHeapMemoryMaximum() > 0; + assert m.getMaximumNonHeapMemoryMaximum() > 0; + assert m.getAverageNonHeapMemoryMaximum() > 0; + + assert m.getAverageNonHeapMemoryMaximum() >= m.getMinimumNonHeapMemoryMaximum() && + m.getAverageNonHeapMemoryMaximum() <= m.getMaximumNonHeapMemoryMaximum(); + + assert m.getMinimumNonHeapMemoryInitialized() > 0; + assert m.getMaximumNonHeapMemoryInitialized() > 0; + assert m.getAverageNonHeapMemoryInitialized() > 0; + + assert m.getAverageNonHeapMemoryInitialized() >= m.getMinimumNonHeapMemoryInitialized() && + m.getAverageNonHeapMemoryInitialized() <= m.getMaximumNonHeapMemoryInitialized(); + + assert m.getYoungestNodeStartTime() > 0; + assert m.getOldestNodeStartTime() > 0; + + assert m.getYoungestNodeStartTime() > m.getOldestNodeStartTime(); + + assert m.getMinimumUpTime() > 0; + assert m.getMaximumUpTime() > 0; + assert m.getAverageUpTime() > 0; + + assert m.getAverageUpTime() >= m.getMinimumUpTime() && m.getAverageUpTime() <= m.getMaximumUpTime(); + + assert m.getMinimumCpusPerNode() > 0; + assert m.getMaximumCpusPerNode() > 0; + assert m.getAverageCpusPerNode() > 0; + + assert m.getAverageCpusPerNode() == m.getMinimumCpusPerNode() && + m.getAverageCpusPerNode() == m.getMaximumCpusPerNode(); + + assert m.getMinimumNodesPerHost() == NODES_CNT; + assert m.getMaximumNodesPerHost() == NODES_CNT; + assert m.getAverageNodesPerHost() == NODES_CNT; + + assert m.getTotalCpus() > 0; + assert m.getTotalHosts() == 1; + assert m.getTotalNodes() == NODES_CNT; + } + + /** + * + */ + private static class JobFinishLock implements IgnitePredicate<IgniteEvent> { + /** Latch. */ + private final CountDownLatch latch = new CountDownLatch(NODES_CNT); + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_JOB_FINISHED; + + latch.countDown(); + + return true; + } + + /** + * Waits until all nodes fire EVT_JOB_FINISHED. + * + * @throws InterruptedException If interrupted. + */ + public void await() throws InterruptedException { + latch.await(); + } + } + + /** + * + */ + private static class MetricsUpdateLock implements IgnitePredicate<IgniteEvent> { + /** Latch. */ + private final CountDownLatch latch = new CountDownLatch(NODES_CNT * 2); + + /** */ + private final Map<UUID, Integer> metricsRcvdCnt = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + Integer cnt = F.addIfAbsent(metricsRcvdCnt, discoEvt.eventNode().id(), 0); + + assert cnt != null; + + if (cnt < 2) { + latch.countDown(); + + metricsRcvdCnt.put(discoEvt.eventNode().id(), ++cnt); + } + + return true; + } + + /** + * Waits until all metrics will be received twice from all nodes in + * topology. + * + * @throws InterruptedException If interrupted. + */ + public void await() throws InterruptedException { + latch.await(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsSelfTest.java new file mode 100644 index 0000000..c37121e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsSelfTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.messaging.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Grid node metrics self test. + */ +@GridCommonTest(group = "Kernal Self") +public class ClusterNodeMetricsSelfTest extends GridCommonAbstractTest { + /** Test message size. */ + private static final int MSG_SIZE = 1024; + + /** Number of messages. */ + private static final int MSG_CNT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(); + cfg.setMetricsUpdateFrequency(0); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSingleTaskMetrics() throws Exception { + Ignite ignite = grid(); + + ignite.compute().execute(new GridTestTask(), "testArg"); + + // Let metrics update twice. + final CountDownLatch latch = new CountDownLatch(2); + + ignite.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_METRICS_UPDATED; + + latch.countDown(); + + return true; + } + }, EVT_NODE_METRICS_UPDATED); + + // Wait for metrics update. + latch.await(); + + ClusterNodeMetrics metrics = ignite.cluster().localNode().metrics(); + + info("Node metrics: " + metrics); + + assert metrics.getAverageActiveJobs() > 0; + assert metrics.getAverageCancelledJobs() == 0; + assert metrics.getAverageJobExecuteTime() >= 0; + assert metrics.getAverageJobWaitTime() >= 0; + assert metrics.getAverageRejectedJobs() == 0; + assert metrics.getAverageWaitingJobs() == 0; + assert metrics.getCurrentActiveJobs() == 0; + assert metrics.getCurrentCancelledJobs() == 0; + assert metrics.getCurrentJobExecuteTime() == 0; + assert metrics.getCurrentJobWaitTime() == 0; + assert metrics.getCurrentWaitingJobs() == 0; + assert metrics.getMaximumActiveJobs() == 1; + assert metrics.getMaximumCancelledJobs() == 0; + assert metrics.getMaximumJobExecuteTime() >= 0; + assert metrics.getMaximumJobWaitTime() >= 0; + assert metrics.getMaximumRejectedJobs() == 0; + assert metrics.getMaximumWaitingJobs() == 0; + assert metrics.getTotalCancelledJobs() == 0; + assert metrics.getTotalExecutedJobs() == 1; + assert metrics.getTotalRejectedJobs() == 0; + assert metrics.getTotalExecutedTasks() == 1; + + assertTrue("MaximumJobExecuteTime=" + metrics.getMaximumJobExecuteTime() + + " is less than AverageJobExecuteTime=" + metrics.getAverageJobExecuteTime(), + metrics.getMaximumJobExecuteTime() >= metrics.getAverageJobExecuteTime()); + } + + /** + * @throws Exception If failed. + */ + public void testInternalTaskMetrics() throws Exception { + Ignite ignite = grid(); + + // Visor task is internal and should not affect metrics. + ignite.compute().withName("visor-test-task").execute(new TestInternalTask(), "testArg"); + + // Let metrics update twice. + final CountDownLatch latch = new CountDownLatch(2); + + ignite.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_NODE_METRICS_UPDATED; + + latch.countDown(); + + return true; + } + }, EVT_NODE_METRICS_UPDATED); + + // Wait for metrics update. + latch.await(); + + ClusterNodeMetrics metrics = ignite.cluster().localNode().metrics(); + + info("Node metrics: " + metrics); + + assert metrics.getAverageActiveJobs() == 0; + assert metrics.getAverageCancelledJobs() == 0; + assert metrics.getAverageJobExecuteTime() == 0; + assert metrics.getAverageJobWaitTime() == 0; + assert metrics.getAverageRejectedJobs() == 0; + assert metrics.getAverageWaitingJobs() == 0; + assert metrics.getCurrentActiveJobs() == 0; + assert metrics.getCurrentCancelledJobs() == 0; + assert metrics.getCurrentJobExecuteTime() == 0; + assert metrics.getCurrentJobWaitTime() == 0; + assert metrics.getCurrentWaitingJobs() == 0; + assert metrics.getMaximumActiveJobs() == 0; + assert metrics.getMaximumCancelledJobs() == 0; + assert metrics.getMaximumJobExecuteTime() == 0; + assert metrics.getMaximumJobWaitTime() == 0; + assert metrics.getMaximumRejectedJobs() == 0; + assert metrics.getMaximumWaitingJobs() == 0; + assert metrics.getTotalCancelledJobs() == 0; + assert metrics.getTotalExecutedJobs() == 0; + assert metrics.getTotalRejectedJobs() == 0; + assert metrics.getTotalExecutedTasks() == 0; + + assertTrue("MaximumJobExecuteTime=" + metrics.getMaximumJobExecuteTime() + + " is less than AverageJobExecuteTime=" + metrics.getAverageJobExecuteTime(), + metrics.getMaximumJobExecuteTime() >= metrics.getAverageJobExecuteTime()); + } + + /** + * @throws Exception If failed. + */ + public void testIoMetrics() throws Exception { + Ignite ignite0 = grid(); + Ignite ignite1 = startGrid(1); + + Object msg = new TestMessage(); + + int size = ignite0.configuration().getMarshaller().marshal(msg).length; + + assert size > MSG_SIZE; + + final CountDownLatch latch = new CountDownLatch(MSG_CNT); + + ignite0.message().localListen(null, new MessagingListenActor<TestMessage>() { + @Override protected void receive(UUID nodeId, TestMessage rcvMsg) throws Throwable { + latch.countDown(); + } + }); + + ignite1.message().localListen(null, new MessagingListenActor<TestMessage>() { + @Override protected void receive(UUID nodeId, TestMessage rcvMsg) throws Throwable { + respond(rcvMsg); + } + }); + + for (int i = 0; i < MSG_CNT; i++) + message(ignite0.cluster().forRemotes()).send(null, msg); + + latch.await(); + + ClusterNodeMetrics metrics = ignite0.cluster().localNode().metrics(); + + info("Node 0 metrics: " + metrics); + + // Time sync messages are being sent. + assert metrics.getSentMessagesCount() >= MSG_CNT; + assert metrics.getSentBytesCount() > size * MSG_CNT; + assert metrics.getReceivedMessagesCount() >= MSG_CNT; + assert metrics.getReceivedBytesCount() > size * MSG_CNT; + + metrics = ignite1.cluster().localNode().metrics(); + + info("Node 1 metrics: " + metrics); + + // Time sync messages are being sent. + assert metrics.getSentMessagesCount() >= MSG_CNT; + assert metrics.getSentBytesCount() > size * MSG_CNT; + assert metrics.getReceivedMessagesCount() >= MSG_CNT; + assert metrics.getReceivedBytesCount() > size * MSG_CNT; + } + + /** + * Test message. + */ + @SuppressWarnings("UnusedDeclaration") + private static class TestMessage implements Serializable { + /** */ + private final byte[] arr = new byte[MSG_SIZE]; + } + + /** + * Test internal task. + */ + @GridInternal + private static class TestInternalTask extends GridTestTask { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityMappedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityMappedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityMappedTest.java new file mode 100644 index 0000000..c97ebe7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityMappedTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests affinity mapping when {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} is used. + */ +public class GridAffinityMappedTest extends GridCommonAbstractTest { + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridAffinityMappedTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + disco.setIpFinder(ipFinder); + cfg.setDiscoverySpi(disco); + + if (gridName.endsWith("1")) + cfg.setCacheConfiguration(); // Empty cache configuration. + else { + assert gridName.endsWith("2") || gridName.endsWith("3"); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAffinity(new MockCacheAffinityFunction()); + cacheCfg.setAffinityMapper(new MockCacheAffinityKeyMapper()); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, gridName.endsWith("2") ? 0 : 1)); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(1); + startGrid(2); + startGrid(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(1); + stopGrid(2); + stopGrid(3); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testMappedAffinity() throws IgniteCheckedException { + Ignite g1 = grid(1); + Ignite g2 = grid(2); + Ignite g3 = grid(3); + + assert g1.configuration().getCacheConfiguration().length == 0; + assert g2.configuration().getCacheConfiguration()[0].getCacheMode() == PARTITIONED; + assert g3.configuration().getCacheConfiguration()[0].getCacheMode() == PARTITIONED; + + ClusterNode first = g2.cluster().localNode(); + ClusterNode second = g3.cluster().localNode(); + + //When MockCacheAfinity and MockCacheAffinityKeyMapper are set to cache configuration we expect the following. + //Key 0 is mapped to partition 0, first node. + //Key 1 is mapped to partition 1, second node. + //key 2 is mapped to partition 0, first node because mapper substitutes key 2 with affinity key 0. + Map<ClusterNode, Collection<Integer>> map = g1.cluster().mapKeysToNodes(null, F.asList(0)); + + assertNotNull(map); + assertEquals("Invalid map size: " + map.size(), 1, map.size()); + assertEquals(F.first(map.keySet()), first); + + UUID id1 = g1.cluster().mapKeyToNode(null, 1).id(); + + assertNotNull(id1); + assertEquals(second.id(), id1); + + UUID id2 = g1.cluster().mapKeyToNode(null, 2).id(); + + assertNotNull(id2); + assertEquals(first.id(), id2); + } + + /** + * Mock affinity implementation that ensures constant key-to-node mapping based on {@link GridCacheModuloAffinityFunction} + * The partition selection is as follows: 0 maps to partition 0 and any other value maps to partition 1 + */ + private static class MockCacheAffinityFunction extends GridCacheModuloAffinityFunction { + /** + * Initializes module affinity with 2 parts and 0 backups + */ + private MockCacheAffinityFunction() { + super(2, 0); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return Integer.valueOf(0) == key ? 0 : 1; + } + + /** {@inheritDoc} */ + @Override public void reset() { + //no-op + } + } + + /** + * Mock affinity mapper implementation that substitutes values other than 0 and 1 with 0. + */ + private static class MockCacheAffinityKeyMapper implements GridCacheAffinityKeyMapper { + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + return key instanceof Integer ? 1 == (Integer)key ? key : 0 : key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // This mapper is stateless and needs no initialization logic. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityP2PSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityP2PSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityP2PSelfTest.java new file mode 100644 index 0000000..4cf3487 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityP2PSelfTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.config.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests affinity and affinity mapper P2P loading. + */ +public class GridAffinityP2PSelfTest extends GridCommonAbstractTest { + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String EXT_AFFINITY_MAPPER_CLS_NAME = "org.gridgain.grid.tests.p2p.GridExternalAffinityMapper"; + + /** */ + private static final String EXT_AFFINITY_CLS_NAME = "org.gridgain.grid.tests.p2p.GridExternalAffinity"; + + /** URL of classes. */ + private static final URL[] URLS; + + /** Current deployment mode. Used in {@link #getConfiguration(String)}. */ + private IgniteDeploymentMode depMode; + + /** + * Initialize URLs. + */ + static { + try { + URLS = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; + } + catch (MalformedURLException e) { + throw new RuntimeException("Define property p2p.uri.cls", e); + } + } + + /** + * + */ + public GridAffinityP2PSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setDeploymentMode(depMode); + + if (gridName.endsWith("1")) + c.setCacheConfiguration(); // Empty cache configuration. + else { + assert gridName.endsWith("2") || gridName.endsWith("3"); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + GridTestExternalClassLoader ldr = new GridTestExternalClassLoader(URLS); + + cc.setAffinity((GridCacheAffinityFunction)ldr.loadClass(EXT_AFFINITY_CLS_NAME).newInstance()); + cc.setAffinityMapper((GridCacheAffinityKeyMapper)ldr.loadClass(EXT_AFFINITY_MAPPER_CLS_NAME) + .newInstance()); + + c.setCacheConfiguration(cc); + c.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, gridName.endsWith("2") ? 0 : 1)); + } + + return c; + } + + /** + * Test {@link org.apache.ignite.configuration.IgniteDeploymentMode#PRIVATE} mode. + * + * @throws Exception if error occur. + */ + public void testPrivateMode() throws Exception { + depMode = IgniteDeploymentMode.PRIVATE; + + affinityTest(); + } + + /** + * Test {@link org.apache.ignite.configuration.IgniteDeploymentMode#ISOLATED} mode. + * + * @throws Exception if error occur. + */ + public void testIsolatedMode() throws Exception { + depMode = IgniteDeploymentMode.ISOLATED; + + affinityTest(); + } + + /** + * Test {@link org.apache.ignite.configuration.IgniteDeploymentMode#CONTINUOUS} mode. + * + * @throws Exception if error occur. + */ + public void testContinuousMode() throws Exception { + depMode = IgniteDeploymentMode.CONTINUOUS; + + affinityTest(); + } + + /** + * Test {@link org.apache.ignite.configuration.IgniteDeploymentMode#SHARED} mode. + * + * @throws Exception if error occur. + */ + public void testSharedMode() throws Exception { + depMode = IgniteDeploymentMode.SHARED; + + affinityTest(); + } + + /** @throws Exception If failed. */ + private void affinityTest() throws Exception { + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + Ignite g3 = startGrid(3); + + try { + assert g1.configuration().getCacheConfiguration().length == 0; + assert g2.configuration().getCacheConfiguration()[0].getCacheMode() == PARTITIONED; + assert g3.configuration().getCacheConfiguration()[0].getCacheMode() == PARTITIONED; + + ClusterNode first = g2.cluster().localNode(); + ClusterNode second = g3.cluster().localNode(); + + //When external affinity and mapper are set to cache configuration we expect the following. + //Key 0 is mapped to partition 0, first node. + //Key 1 is mapped to partition 1, second node. + //key 2 is mapped to partition 0, first node because mapper substitutes key 2 with affinity key 0. + Map<ClusterNode, Collection<Integer>> map = g1.cluster().mapKeysToNodes(null, F.asList(0)); + + assertNotNull(map); + assertEquals("Invalid map size: " + map.size(), 1, map.size()); + assertEquals(F.first(map.keySet()), first); + + ClusterNode n1 = g1.cluster().mapKeyToNode(null, 1); + + assertNotNull(n1); + + UUID id1 = n1.id(); + + assertNotNull(id1); + assertEquals(second.id(), id1); + + ClusterNode n2 = g1.cluster().mapKeyToNode(null, 2); + + assertNotNull(n2); + + UUID id2 = n2.id(); + + assertNotNull(id2); + assertEquals(first.id(), id2); + } + finally { + stopGrid(1); + stopGrid(2); + stopGrid(3); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java new file mode 100644 index 0000000..adc4a32 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests affinity mapping. + */ +public class GridAffinitySelfTest extends GridCommonAbstractTest { + /** VM ip finder for TCP discovery. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + if (gridName.endsWith("1")) + cfg.setCacheConfiguration(); // Empty cache configuration. + else { + assert gridName.endsWith("2"); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + + cfg.setCacheConfiguration(cacheCfg); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGridsMultiThreaded(1, 2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testAffinity() throws IgniteCheckedException { + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + assert caches(g1).size() == 0; + assert F.first(caches(g2)).getCacheMode() == PARTITIONED; + + Map<ClusterNode, Collection<String>> map = g1.cluster().mapKeysToNodes(null, F.asList("1")); + + assertNotNull(map); + assertEquals("Invalid map size: " + map.size(), 1, map.size()); + assertEquals(F.first(map.keySet()), g2.cluster().localNode()); + + UUID id1 = g1.cluster().mapKeyToNode(null, "2").id(); + + assertNotNull(id1); + assertEquals(g2.cluster().localNode().id(), id1); + + UUID id2 = g1.cluster().mapKeyToNode(null, "3").id(); + + assertNotNull(id2); + assertEquals(g2.cluster().localNode().id(), id2); + } + + /** + * @param g Grid. + * @return Non-system caches. + */ + private Collection<CacheConfiguration> caches(Ignite g) { + return F.view(Arrays.asList(g.configuration().getCacheConfiguration()), new IgnitePredicate<CacheConfiguration>() { + @Override public boolean apply(CacheConfiguration c) { + return c.getName() == null || !c.getName().equals(CU.UTILITY_CACHE_NAME); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridAlwaysFailoverSpiFailSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAlwaysFailoverSpiFailSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAlwaysFailoverSpiFailSelfTest.java new file mode 100644 index 0000000..2c235c1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAlwaysFailoverSpiFailSelfTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Always failover SPI test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridAlwaysFailoverSpiFailSelfTest extends GridCommonAbstractTest { + /** */ + private boolean isFailoverCalled; + + /** */ + public GridAlwaysFailoverSpiFailSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + cfg.setFailoverSpi(new GridTestFailoverSpi()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"UnusedCatchParameter", "ThrowableInstanceNeverThrown"}) + public void testFailoverTask() throws Exception { + isFailoverCalled = false; + + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTestFailoverTask.class, GridTestFailoverTask.class.getClassLoader()); + + try { + ignite.compute().execute(GridTestFailoverTask.class.getName(), + new ComputeExecutionRejectedException("Task should be failed over")); + + assert false; + } + catch (IgniteCheckedException e) { + //No-op + } + + assert isFailoverCalled; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"UnusedCatchParameter", "ThrowableInstanceNeverThrown"}) + public void testNoneFailoverTask() throws Exception { + isFailoverCalled = false; + + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridTestFailoverTask.class, GridTestFailoverTask.class.getClassLoader()); + + try { + ignite.compute().execute(GridTestFailoverTask.class.getName(), + new IgniteCheckedException("Task should NOT be failed over")); + + assert false; + } + catch (IgniteCheckedException e) { + //No-op + } + + assert !isFailoverCalled; + } + + /** */ + private class GridTestFailoverSpi extends AlwaysFailoverSpi { + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> grid) { + isFailoverCalled = true; + + return super.failover(ctx, grid); + } + } + + /** + * Task which splits to the jobs that always fail. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static final class GridTestFailoverTask extends ComputeTaskSplitAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public Collection<? extends ComputeJob> split(int gridSize, Object arg) { + assert gridSize == 1; + assert arg instanceof IgniteCheckedException; + + Collection<ComputeJob> res = new ArrayList<>(gridSize); + + for (int i = 0; i < gridSize; i++) + res.add(new GridTestFailoverJob((IgniteCheckedException)arg)); + + return res; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, + List<ComputeJobResult> received) throws IgniteCheckedException { + if (res.getException() != null) + return ComputeJobResultPolicy.FAILOVER; + + return super.result(res, received); + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) { + return null; + } + } + + /** + * Job that always throws exception. + */ + private static class GridTestFailoverJob extends ComputeJobAdapter { + /** + * @param ex Exception to be thrown in {@link #execute}. + */ + GridTestFailoverJob(IgniteCheckedException ex) { super(ex); } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException execute() throws IgniteCheckedException { + throw this.<IgniteCheckedException>argument(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCacheProjectionRemoveTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCacheProjectionRemoveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCacheProjectionRemoveTest.java new file mode 100644 index 0000000..0d46dc2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCacheProjectionRemoveTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.gridgain.grid.kernal.processors.cache.*; + +/** + * + */ +public class GridCacheProjectionRemoveTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testRemove() throws IgniteCheckedException { + cache().put("key", 1); + + assert cache().remove("key", 1); + assert !cache().remove("key", 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java new file mode 100644 index 0000000..730225f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Test task cancellation on grid stop. + */ +@SuppressWarnings({"ProhibitedExceptionDeclared"}) +@GridCommonTest(group = "Kernal Self") +public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest { + /** */ + private static CountDownLatch cnt; + + /** */ + private static boolean cancelCall; + + /** */ + public GridCancelOnGridStopSelfTest() { + super(false); + } + + /** + * @throws Exception If failed. + */ + public void testCancelingJob() throws Exception { + cancelCall = false; + + try (Ignite g = startGrid(1)) { + cnt = new CountDownLatch(1); + + g.compute().enableAsync().execute(CancelledTask.class, null); + + cnt.await(); + } + + assert cancelCall; + } + + /** + * Cancelled task. + */ + private static final class CancelledTask extends ComputeTaskAdapter<String, Void> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable String arg) + throws IgniteCheckedException { + for (ClusterNode node : subgrid) { + if (node.id().equals(ignite.configuration().getNodeId())) { + return Collections.singletonMap(new ComputeJob() { + @Override public void cancel() { + cancelCall = true; + } + + @Override public Serializable execute() throws IgniteCheckedException { + cnt.countDown(); + + try { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + + return null; + } + }, node); + } + } + + throw new IgniteCheckedException("Local node not found"); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java new file mode 100644 index 0000000..a751b11 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Cancel unused job test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest { + /** */ + private static final int WAIT_TIME = 100000; + + /** */ + public static final int SPLIT_COUNT = 10; + + /** */ + private static volatile int cancelCnt; + + /** */ + private static volatile int processedCnt; + + /** */ + private static CountDownLatch startSignal = new CountDownLatch(SPLIT_COUNT); + + /** */ + private static CountDownLatch stopSignal = new CountDownLatch(SPLIT_COUNT); + + /** */ + public GridCancelUnusedJobSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(discoSpi); + + c.setExecutorService( + new ThreadPoolExecutor( + SPLIT_COUNT, + SPLIT_COUNT, + 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>())); + + c.setExecutorServiceShutdown(true); + + return c; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testCancel() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + ignite.compute().localDeployTask(GridCancelTestTask.class, U.detectClassLoader(GridCancelTestTask.class)); + + ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridCancelTestTask.class.getName(), null); + + // Wait until jobs begin execution. + boolean await = startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS); + + assert await : "Jobs did not start."; + + info("Test task result: " + fut); + + assert fut != null; + + // Only first job should successfully complete. + Object res = fut.get(); + assert (Integer)res == 1; + + // Wait for all jobs to finish. + await = stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS); + assert await : "Jobs did not stop."; + + // One is definitely processed. But there might be some more processed or cancelled or processed and cancelled. + // Thus total number should be at least SPLIT_COUNT and at most (SPLIT_COUNT - 1) *2 +1 + assert (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 +1 : + "Invalid cancel count value: " + cancelCnt; + } + + /** + * + */ + private static class GridCancelTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']'); + + Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT); + + for (int i = 1; i <= SPLIT_COUNT; i++) + jobs.add(new GridCancelTestJob(i)); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) { + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("Reducing job [job=" + this + ", results=" + results + ']'); + + if (results.size() > 1) + fail(); + + return results.get(0).getData(); + } + } + + /** + * Cancel test job. + */ + private static class GridCancelTestJob extends ComputeJobAdapter { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + /** + * @param arg Argument. + */ + private GridCancelTestJob(Integer arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() { + int arg = this.<Integer>argument(0); + + try { + if (log.isInfoEnabled()) + log.info("Executing job [job=" + this + ", arg=" + arg + ']'); + + startSignal.countDown(); + + try { + if (!startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS)) + fail(); + + if (arg == 1) { + if (log.isInfoEnabled()) + log.info("Job one is proceeding."); + } + else + Thread.sleep(WAIT_TIME); + } + catch (InterruptedException e) { + if (log.isInfoEnabled()) + log.info("Job got cancelled [arg=" + arg + ", ses=" + ses + ", e=" + e + ']'); + + return 0; + } + + if (log.isInfoEnabled()) + log.info("Completing job: " + ses); + + return argument(0); + } + finally { + stopSignal.countDown(); + + processedCnt++; + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelCnt++; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java new file mode 100644 index 0000000..d37c57d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.collision.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Cancelled jobs metrics self test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest { + + /** */ + private static GridCancelCollisionSpi colSpi = new GridCancelCollisionSpi(); + + /** */ + public GridCancelledJobsMetricsSelfTest() { + super(true); + } + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + cfg.setCollisionSpi(colSpi); + + DiscoverySpi discoSpi = cfg.getDiscoverySpi(); + + assert discoSpi instanceof TcpDiscoverySpi; + + ((TcpDiscoverySpi)discoSpi).setHeartbeatFrequency(500); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testCancelledJobs() throws Exception { + final Ignite ignite = G.ignite(getTestGridName()); + + Collection<ComputeTaskFuture<?>> futs = new ArrayList<>(); + + IgniteCompute comp = ignite.compute().enableAsync(); + + for (int i = 1; i <= 10; i++) { + comp.execute(CancelledTask.class, null); + + futs.add(comp.future()); + } + + // Wait to be sure that metrics were updated. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0; + } + }, 5000); + + colSpi.externalCollision(); + + for (ComputeTaskFuture<?> fut : futs) { + try { + fut.get(); + + assert false : "Job was not interrupted."; + } + catch (IgniteCheckedException e) { + if (e.hasCause(InterruptedException.class)) + throw new IgniteCheckedException("Test run has been interrupted.", e); + + info("Caught expected exception: " + e.getMessage()); + } + } + + // Job was cancelled and now we need to calculate metrics. + int totalCancelledJobs = ignite.cluster().localNode().metrics().getTotalCancelledJobs(); + + assert totalCancelledJobs == 10 : "Metrics were not updated. Expected 10 got " + totalCancelledJobs; + } + + /** + * + */ + private static final class CancelledTask extends ComputeTaskSplitAdapter<String, Object> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) { + return Arrays.asList(new GridCancelledJob()); + } + + /** {@inheritDoc} */ + @Override public Object reduce(List<ComputeJobResult> results) { + assert results.get(0).isCancelled() : "Wrong job result status."; + + return null; + } + } + + /** + * + */ + private static final class GridCancelledJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public String execute() throws IgniteCheckedException { + X.println("Executing job."); + + try { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException ignored) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e1) { + throw new IgniteCheckedException("Unexpected exception: ", e1); + } + + throw new IgniteCheckedException("Job got interrupted while waiting for cancellation."); + } + finally { + X.println("Finished job."); + } + + return null; + } + } + + /** + * + */ + @IgniteSpiMultipleInstancesSupport(true) + private static class GridCancelCollisionSpi extends IgniteSpiAdapter + implements CollisionSpi { + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private CollisionExternalListener lsnr; + + /** {@inheritDoc} */ + @Override public void onCollision(CollisionContext ctx) { + Collection<CollisionJobContext> activeJobs = ctx.activeJobs(); + Collection<CollisionJobContext> waitJobs = ctx.waitingJobs(); + + for (CollisionJobContext job : waitJobs) + job.activate(); + + for (CollisionJobContext job : activeJobs) { + log.info("Cancelling job : " + job.getJob()); + + job.cancel(); + } + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + // Start SPI start stopwatch. + startStopwatch(); + + // Ack start. + if (log.isInfoEnabled()) + log.info(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // Ack stop. + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** {@inheritDoc} */ + @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) { + this.lsnr = lsnr; + } + + /** */ + public void externalCollision() { + CollisionExternalListener tmp = lsnr; + + if (tmp != null) + tmp.onExternalCollision(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCollisionJobsContextSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCollisionJobsContextSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCollisionJobsContextSelfTest.java new file mode 100644 index 0000000..9f60f79 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCollisionJobsContextSelfTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.collision.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import java.util.*; + +/** + * Collision job context test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridCollisionJobsContextSelfTest extends GridCommonAbstractTest { + /** */ + public GridCollisionJobsContextSelfTest() { + super(/*start grid*/true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + Ignite ignite = G.ignite(getTestGridName()); + + assert ignite != null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCollisionSpi(new TestCollisionSpi()); + + return cfg; + } + + /** + * @throws Exception If test failed. + */ + public void testCollisionJobContext() throws Exception { + G.ignite(getTestGridName()).compute().execute(new GridTestTask(), "some-arg"); + } + + /** */ + @SuppressWarnings( {"PublicInnerClass"}) + @IgniteSpiMultipleInstancesSupport(true) + public static class TestCollisionSpi extends IgniteSpiAdapter implements CollisionSpi { + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onCollision(CollisionContext ctx) { + Collection<CollisionJobContext> activeJobs = ctx.activeJobs(); + Collection<CollisionJobContext> waitJobs = ctx.waitingJobs(); + + assert waitJobs != null; + assert activeJobs != null; + + + for (CollisionJobContext job : waitJobs) { + assert job.getJob() != null; + assert job.getJobContext() != null; + assert job.getTaskSession() != null; + + assert job.getJob() instanceof GridTestJob : job.getJob(); + + job.activate(); + } + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + // Start SPI start stopwatch. + startStopwatch(); + + // Ack start. + if (log.isInfoEnabled()) + log.info(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // Ack stop. + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** {@inheritDoc} */ + @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCommunicationSelfTest.java new file mode 100644 index 0000000..94d3787 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCommunicationSelfTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Grid basic communication test. + */ +@GridCommonTest(group = "Kernal Self") +public class GridCommunicationSelfTest extends GridCommonAbstractTest { + /** */ + private static Ignite ignite; + + /** */ + public GridCommunicationSelfTest() { + super(/*start grid*/true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = G.ignite(getTestGridName()); + } + + /** + * @throws Exception If failed. + */ + public void testSendMessageToEmptyNodes() throws Exception { + Collection<ClusterNode> empty = Collections.emptyList(); + + try { + sendMessage(empty, 1); + } + catch (IllegalArgumentException ignored) { + // No-op. + } + } + + /** + * @param nodes Nodes to send message to. + * @param cntr Counter. + */ + private void sendMessage(Collection<ClusterNode> nodes, int cntr) { + try { + message(ignite.cluster().forNodes(nodes)).send(null, + new GridTestCommunicationMessage(cntr, ignite.cluster().localNode().id())); + } + catch (IgniteCheckedException e) { + error("Failed to send message.", e); + } + } + + /** + * Test message. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class GridTestCommunicationMessage implements Serializable { + /** */ + private final int msgId; + + /** */ + private final UUID sndId; + + /** + * @param msgId Message id. + * @param sndId Sender id. + */ + public GridTestCommunicationMessage(int msgId, UUID sndId) { + assert sndId != null; + + this.msgId = msgId; + this.sndId = sndId; + } + + /** + * @return Message id. + */ + public int getMessageId() { + return msgId; + } + + /** + * @return Sender id. + */ + public UUID getSenderId() { + return sndId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder buf = new StringBuilder(); + + buf.append(getClass().getSimpleName()); + buf.append(" [msgId=").append(msgId); + buf.append(']'); + + return buf.toString(); + } + } +}