Merge branch 'sprint-2' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-141
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1e83a269 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1e83a269 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1e83a269 Branch: refs/heads/ignite-141 Commit: 1e83a269b530ca32c39dfee3e11c10396435604e Parents: 1bdd8aa d3a3a3e Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Mar 5 10:33:29 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Mar 5 10:33:29 2015 -0800 ---------------------------------------------------------------------- bin/setup-hadoop.bat | 2 +- bin/setup-hadoop.sh | 2 +- config/hadoop/default-config.xml | 10 +- docs/core-site.ignite.xml | 6 +- examples/config/filesystem/core-site.xml | 4 +- examples/config/filesystem/example-igfs.xml | 4 +- .../ignite/examples/igfs/IgfsExample.java | 18 +- .../examples/igfs/IgfsMapReduceExample.java | 6 +- .../src/main/java/org/apache/ignite/Ignite.java | 13 +- .../org/apache/ignite/IgniteFileSystem.java | 457 ++++ .../main/java/org/apache/ignite/IgniteFs.java | 346 --- .../configuration/FileSystemConfiguration.java | 805 ++++++ .../configuration/HadoopConfiguration.java | 173 ++ .../ignite/configuration/IgfsConfiguration.java | 807 ------ .../configuration/IgniteConfiguration.java | 19 +- .../org/apache/ignite/events/CacheEvent.java | 20 +- .../main/java/org/apache/ignite/igfs/Igfs.java | 216 -- .../apache/ignite/igfs/IgfsBlockLocation.java | 2 +- .../java/org/apache/ignite/igfs/IgfsFile.java | 2 +- .../org/apache/ignite/igfs/IgfsInputStream.java | 6 +- .../org/apache/ignite/igfs/IgfsMetrics.java | 2 +- .../java/org/apache/ignite/igfs/IgfsMode.java | 8 +- .../java/org/apache/ignite/igfs/IgfsReader.java | 38 - .../mapreduce/IgfsInputStreamJobAdapter.java | 4 +- .../apache/ignite/igfs/mapreduce/IgfsJob.java | 4 +- .../igfs/mapreduce/IgfsRecordResolver.java | 2 +- .../apache/ignite/igfs/mapreduce/IgfsTask.java | 4 +- .../ignite/igfs/mapreduce/IgfsTaskArgs.java | 2 +- .../IgfsByteDelimiterRecordResolver.java | 3 +- .../records/IgfsFixedLengthRecordResolver.java | 2 +- .../igfs/secondary/IgfsSecondaryFileSystem.java | 201 ++ ...fsSecondaryFileSystemPositionedReadable.java | 38 + .../apache/ignite/igfs/secondary/package.html | 24 + .../ignite/internal/GridKernalContext.java | 32 +- .../ignite/internal/GridKernalContextImpl.java | 31 +- .../ignite/internal/GridUpdateNotifier.java | 16 +- .../ignite/internal/IgniteComponentType.java | 4 +- .../org/apache/ignite/internal/IgniteEx.java | 4 +- .../apache/ignite/internal/IgniteKernal.java | 285 +-- .../org/apache/ignite/internal/IgnitionEx.java | 12 +- .../org/apache/ignite/internal/SkipDaemon.java | 29 + .../ignite/internal/managers/GridManager.java | 20 +- .../internal/managers/GridManagerAdapter.java | 73 +- .../checkpoint/GridCheckpointManager.java | 4 +- .../collision/GridCollisionManager.java | 4 +- .../discovery/GridDiscoveryManager.java | 38 +- .../managers/indexing/GridIndexingManager.java | 4 +- .../swapspace/GridSwapSpaceManager.java | 4 +- .../internal/processors/GridProcessor.java | 11 +- .../processors/GridProcessorAdapter.java | 5 - .../processors/cache/GridCacheAdapter.java | 15 +- .../processors/cache/GridCacheProcessor.java | 60 +- .../processors/cache/GridCacheUtils.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 8 + .../clock/GridClockSyncProcessor.java | 9 +- .../internal/processors/hadoop/GridHadoop.java | 86 - .../hadoop/GridHadoopConfiguration.java | 172 -- .../processors/hadoop/GridHadoopCounter.java | 44 - .../hadoop/GridHadoopCounterWriter.java | 36 - .../processors/hadoop/GridHadoopCounters.java | 49 - .../processors/hadoop/GridHadoopFileBlock.java | 162 -- .../processors/hadoop/GridHadoopInputSplit.java | 54 - .../processors/hadoop/GridHadoopJob.java | 102 - .../processors/hadoop/GridHadoopJobId.java | 103 - .../processors/hadoop/GridHadoopJobInfo.java | 83 - .../processors/hadoop/GridHadoopJobPhase.java | 38 - .../hadoop/GridHadoopJobProperty.java | 138 - .../processors/hadoop/GridHadoopJobStatus.java | 207 -- .../hadoop/GridHadoopMapReducePlan.java | 80 - .../hadoop/GridHadoopMapReducePlanner.java | 40 - .../hadoop/GridHadoopPartitioner.java | 33 - .../hadoop/GridHadoopSerialization.java | 54 - .../processors/hadoop/GridHadoopTask.java | 72 - .../hadoop/GridHadoopTaskContext.java | 189 -- .../processors/hadoop/GridHadoopTaskInfo.java | 153 -- .../processors/hadoop/GridHadoopTaskInput.java | 55 - .../processors/hadoop/GridHadoopTaskOutput.java | 40 - .../processors/hadoop/GridHadoopTaskType.java | 56 - .../internal/processors/hadoop/Hadoop.java | 88 + .../processors/hadoop/HadoopFileBlock.java | 162 ++ .../processors/hadoop/HadoopInputSplit.java | 54 + .../internal/processors/hadoop/HadoopJob.java | 102 + .../internal/processors/hadoop/HadoopJobId.java | 103 + .../processors/hadoop/HadoopJobInfo.java | 83 + .../processors/hadoop/HadoopJobPhase.java | 38 + .../processors/hadoop/HadoopJobProperty.java | 138 + .../processors/hadoop/HadoopJobStatus.java | 207 ++ .../processors/hadoop/HadoopMapReducePlan.java | 80 + .../hadoop/HadoopMapReducePlanner.java | 40 + .../processors/hadoop/HadoopNoopProcessor.java | 76 + .../processors/hadoop/HadoopPartitioner.java | 33 + .../hadoop/HadoopProcessorAdapter.java | 96 + .../processors/hadoop/HadoopSerialization.java | 54 + .../internal/processors/hadoop/HadoopTask.java | 72 + .../processors/hadoop/HadoopTaskContext.java | 190 ++ .../processors/hadoop/HadoopTaskInfo.java | 153 ++ .../processors/hadoop/HadoopTaskInput.java | 55 + .../processors/hadoop/HadoopTaskOutput.java | 40 + .../processors/hadoop/HadoopTaskType.java | 56 + .../hadoop/IgniteHadoopNoopProcessor.java | 74 - .../hadoop/IgniteHadoopProcessorAdapter.java | 94 - .../hadoop/counter/HadoopCounter.java | 44 + .../hadoop/counter/HadoopCounterWriter.java | 37 + .../hadoop/counter/HadoopCounters.java | 49 + .../internal/processors/igfs/IgfsAsyncImpl.java | 9 +- .../internal/processors/igfs/IgfsContext.java | 6 +- .../processors/igfs/IgfsDataManager.java | 5 +- .../ignite/internal/processors/igfs/IgfsEx.java | 28 +- .../internal/processors/igfs/IgfsFileInfo.java | 4 +- .../internal/processors/igfs/IgfsImpl.java | 42 +- .../processors/igfs/IgfsInputStreamAdapter.java | 4 +- .../processors/igfs/IgfsInputStreamImpl.java | 5 +- .../internal/processors/igfs/IgfsJobImpl.java | 2 +- .../processors/igfs/IgfsMetaManager.java | 27 +- .../processors/igfs/IgfsNoopProcessor.java | 4 +- .../processors/igfs/IgfsOutputStreamImpl.java | 2 +- .../internal/processors/igfs/IgfsProcessor.java | 122 +- .../processors/igfs/IgfsProcessorAdapter.java | 4 +- .../igfs/IgfsSecondaryFileSystemImpl.java | 121 + .../IgfsSecondaryInputStreamDescriptor.java | 8 +- .../processors/igfs/IgfsServerManager.java | 4 +- .../processors/job/GridJobProcessor.java | 1 + .../processors/rest/GridRestProcessor.java | 64 +- .../streamer/GridStreamProcessor.java | 31 +- .../ignite/internal/util/IgniteUtils.java | 61 +- .../ignite/internal/visor/igfs/VisorIgfs.java | 4 +- .../visor/igfs/VisorIgfsProfilerClearTask.java | 2 +- .../visor/node/VisorGridConfiguration.java | 2 +- .../visor/node/VisorIgfsConfiguration.java | 9 +- .../visor/node/VisorNodeDataCollectorJob.java | 2 +- .../internal/visor/util/VisorTaskUtils.java | 4 +- .../apache/ignite/plugin/PluginProvider.java | 3 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 20 +- .../ignite/spi/IgniteSpiConsistencyChecked.java | 7 - .../hadoop/core-site-loopback-secondary.xml | 4 +- .../test/config/hadoop/core-site-loopback.xml | 4 +- .../test/config/hadoop/core-site-secondary.xml | 4 +- .../core/src/test/config/hadoop/core-site.xml | 4 +- modules/core/src/test/config/igfs-loopback.xml | 4 +- modules/core/src/test/config/igfs-shmem.xml | 4 +- .../ignite/igfs/IgfsEventsAbstractSelfTest.java | 10 +- .../igfs/IgfsFragmentizerAbstractSelfTest.java | 4 +- .../ignite/igfs/IgfsFragmentizerSelfTest.java | 8 +- .../igfs/IgfsFragmentizerTopologySelfTest.java | 2 +- .../cache/GridCacheConcurrentMapTest.java | 4 +- ...unctionExcludeNeighborsAbstractSelfTest.java | 1 + ...heIgfsPerBlockLruEvictionPolicySelfTest.java | 485 ---- .../processors/igfs/IgfsAbstractSelfTest.java | 85 +- ...sCachePerBlockLruEvictionPolicySelfTest.java | 485 ++++ .../processors/igfs/IgfsCacheSelfTest.java | 4 +- .../igfs/IgfsDataManagerSelfTest.java | 5 +- .../igfs/IgfsDualAbstractSelfTest.java | 12 +- .../igfs/IgfsMetaManagerSelfTest.java | 4 +- .../processors/igfs/IgfsMetricsSelfTest.java | 22 +- .../processors/igfs/IgfsModesSelfTest.java | 14 +- .../processors/igfs/IgfsProcessorSelfTest.java | 10 +- .../igfs/IgfsProcessorValidationSelfTest.java | 50 +- ...IpcEndpointRegistrationAbstractSelfTest.java | 10 +- ...dpointRegistrationOnLinuxAndMacSelfTest.java | 4 +- ...pcEndpointRegistrationOnWindowsSelfTest.java | 2 +- .../processors/igfs/IgfsSizeSelfTest.java | 4 +- .../processors/igfs/IgfsStreamsSelfTest.java | 14 +- .../processors/igfs/IgfsTaskSelfTest.java | 10 +- .../IgfsAbstractRecordResolverSelfTest.java | 6 +- .../ipc/shmem/IpcSharedMemoryNodeStartup.java | 4 +- .../ignite/testframework/junits/IgniteMock.java | 4 +- .../ignite/testsuites/IgniteIgfsTestSuite.java | 2 +- .../client/hadoop/GridHadoopClientProtocol.java | 334 --- .../GridHadoopClientProtocolProvider.java | 137 - .../counter/GridHadoopClientCounterGroup.java | 121 - .../counter/GridHadoopClientCounters.java | 217 -- .../apache/ignite/client/hadoop/package.html | 24 - .../fs/IgniteHadoopFileSystemCounterWriter.java | 93 + .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 416 +++ .../org/apache/ignite/hadoop/fs/package.html | 24 + .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1237 +++++++++ .../org/apache/ignite/hadoop/fs/v1/package.html | 24 + .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 982 ++++++++ .../org/apache/ignite/hadoop/fs/v2/package.html | 24 + .../IgniteHadoopClientProtocolProvider.java | 138 + .../mapreduce/IgniteHadoopMapReducePlanner.java | 435 ++++ .../apache/ignite/hadoop/mapreduce/package.html | 24 + .../java/org/apache/ignite/hadoop/package.html | 24 + .../hadoop/IgfsHadoopFileSystemWrapper.java | 414 --- .../igfs/hadoop/IgfsHadoopParameters.java | 94 - .../org/apache/ignite/igfs/hadoop/package.html | 24 - .../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 1234 --------- .../apache/ignite/igfs/hadoop/v1/package.html | 24 - .../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 982 -------- .../apache/ignite/igfs/hadoop/v2/package.html | 24 - .../java/org/apache/ignite/igfs/package.html | 24 - .../ignite/internal/igfs/hadoop/IgfsHadoop.java | 198 -- .../IgfsHadoopCommunicationException.java | 57 - .../igfs/hadoop/IgfsHadoopEndpoint.java | 210 -- .../internal/igfs/hadoop/IgfsHadoopEx.java | 88 - .../igfs/hadoop/IgfsHadoopFSProperties.java | 88 - .../internal/igfs/hadoop/IgfsHadoopFuture.java | 94 - .../internal/igfs/hadoop/IgfsHadoopInProc.java | 409 --- .../igfs/hadoop/IgfsHadoopInputStream.java | 626 ----- .../internal/igfs/hadoop/IgfsHadoopIo.java | 76 - .../internal/igfs/hadoop/IgfsHadoopIpcIo.java | 599 ----- .../igfs/hadoop/IgfsHadoopIpcIoListener.java | 36 - .../igfs/hadoop/IgfsHadoopJclLogger.java | 112 - .../internal/igfs/hadoop/IgfsHadoopOutProc.java | 466 ---- .../igfs/hadoop/IgfsHadoopOutputStream.java | 201 -- .../igfs/hadoop/IgfsHadoopProxyInputStream.java | 335 --- .../hadoop/IgfsHadoopProxyOutputStream.java | 165 -- .../internal/igfs/hadoop/IgfsHadoopReader.java | 104 - .../igfs/hadoop/IgfsHadoopStreamDelegate.java | 96 - .../hadoop/IgfsHadoopStreamEventListener.java | 39 - .../internal/igfs/hadoop/IgfsHadoopUtils.java | 131 - .../internal/igfs/hadoop/IgfsHadoopWrapper.java | 511 ---- .../ignite/internal/igfs/hadoop/package.html | 24 - .../apache/ignite/internal/igfs/package.html | 24 - .../hadoop/GridHadoopClassLoader.java | 552 ---- .../processors/hadoop/GridHadoopComponent.java | 61 - .../processors/hadoop/GridHadoopContext.java | 196 -- .../hadoop/GridHadoopDefaultJobInfo.java | 163 -- .../processors/hadoop/GridHadoopImpl.java | 132 - .../processors/hadoop/GridHadoopSetup.java | 505 ---- .../GridHadoopTaskCancelledException.java | 35 - .../processors/hadoop/GridHadoopUtils.java | 308 --- .../processors/hadoop/HadoopClassLoader.java | 552 ++++ .../processors/hadoop/HadoopComponent.java | 61 + .../processors/hadoop/HadoopContext.java | 197 ++ .../processors/hadoop/HadoopDefaultJobInfo.java | 163 ++ .../internal/processors/hadoop/HadoopImpl.java | 134 + .../hadoop/HadoopMapReduceCounterGroup.java | 121 + .../hadoop/HadoopMapReduceCounters.java | 216 ++ .../processors/hadoop/HadoopProcessor.java | 227 ++ .../internal/processors/hadoop/HadoopSetup.java | 505 ++++ .../hadoop/HadoopTaskCancelledException.java | 35 + .../internal/processors/hadoop/HadoopUtils.java | 308 +++ .../hadoop/IgniteHadoopProcessor.java | 225 -- .../counter/GridHadoopCounterAdapter.java | 128 - .../hadoop/counter/GridHadoopCountersImpl.java | 198 -- .../counter/GridHadoopFSCounterWriter.java | 91 - .../hadoop/counter/GridHadoopLongCounter.java | 92 - .../counter/GridHadoopPerformanceCounter.java | 279 --- .../hadoop/counter/HadoopCounterAdapter.java | 127 + .../hadoop/counter/HadoopCountersImpl.java | 197 ++ .../hadoop/counter/HadoopLongCounter.java | 90 + .../counter/HadoopPerformanceCounter.java | 279 +++ .../fs/GridHadoopDistributedFileSystem.java | 91 - .../hadoop/fs/GridHadoopFileSystemsUtils.java | 57 - .../hadoop/fs/GridHadoopLocalFileSystemV1.java | 39 - .../hadoop/fs/GridHadoopLocalFileSystemV2.java | 86 - .../hadoop/fs/GridHadoopRawLocalFileSystem.java | 304 --- .../hadoop/fs/HadoopDistributedFileSystem.java | 91 + .../hadoop/fs/HadoopFileSystemsUtils.java | 57 + .../hadoop/fs/HadoopLocalFileSystemV1.java | 39 + .../hadoop/fs/HadoopLocalFileSystemV2.java | 86 + .../processors/hadoop/fs/HadoopParameters.java | 94 + .../hadoop/fs/HadoopRawLocalFileSystem.java | 304 +++ .../processors/hadoop/igfs/HadoopIgfs.java | 198 ++ .../igfs/HadoopIgfsCommunicationException.java | 57 + .../hadoop/igfs/HadoopIgfsEndpoint.java | 210 ++ .../processors/hadoop/igfs/HadoopIgfsEx.java | 88 + .../hadoop/igfs/HadoopIgfsFuture.java | 94 + .../hadoop/igfs/HadoopIgfsInProc.java | 409 +++ .../hadoop/igfs/HadoopIgfsInputStream.java | 626 +++++ .../processors/hadoop/igfs/HadoopIgfsIo.java | 76 + .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 599 +++++ .../hadoop/igfs/HadoopIgfsIpcIoListener.java | 36 + .../hadoop/igfs/HadoopIgfsJclLogger.java | 115 + .../hadoop/igfs/HadoopIgfsOutProc.java | 466 ++++ .../hadoop/igfs/HadoopIgfsOutputStream.java | 201 ++ .../hadoop/igfs/HadoopIgfsProperties.java | 88 + .../hadoop/igfs/HadoopIgfsProxyInputStream.java | 335 +++ .../igfs/HadoopIgfsProxyOutputStream.java | 165 ++ ...fsSecondaryFileSystemPositionedReadable.java | 104 + .../hadoop/igfs/HadoopIgfsStreamDelegate.java | 96 + .../igfs/HadoopIgfsStreamEventListener.java | 39 + .../processors/hadoop/igfs/HadoopIgfsUtils.java | 131 + .../hadoop/igfs/HadoopIgfsWrapper.java | 511 ++++ .../jobtracker/GridHadoopJobMetadata.java | 305 --- .../hadoop/jobtracker/GridHadoopJobTracker.java | 1625 ------------ .../hadoop/jobtracker/HadoopJobMetadata.java | 306 +++ .../hadoop/jobtracker/HadoopJobTracker.java | 1626 ++++++++++++ .../hadoop/message/GridHadoopMessage.java | 27 - .../hadoop/message/HadoopMessage.java | 27 + .../planner/GridHadoopDefaultMapReducePlan.java | 107 - .../GridHadoopDefaultMapReducePlanner.java | 434 ---- .../planner/HadoopDefaultMapReducePlan.java | 107 + .../GridHadoopProtocolJobCountersTask.java | 45 - .../proto/GridHadoopProtocolJobStatusTask.java | 81 - .../proto/GridHadoopProtocolKillJobTask.java | 46 - .../proto/GridHadoopProtocolNextTaskIdTask.java | 35 - .../proto/GridHadoopProtocolSubmitJobTask.java | 57 - .../proto/GridHadoopProtocolTaskAdapter.java | 113 - .../proto/GridHadoopProtocolTaskArguments.java | 81 - .../hadoop/proto/HadoopClientProtocol.java | 333 +++ .../proto/HadoopProtocolJobCountersTask.java | 46 + .../proto/HadoopProtocolJobStatusTask.java | 81 + .../hadoop/proto/HadoopProtocolKillJobTask.java | 46 + .../proto/HadoopProtocolNextTaskIdTask.java | 35 + .../proto/HadoopProtocolSubmitJobTask.java | 57 + .../hadoop/proto/HadoopProtocolTaskAdapter.java | 113 + .../proto/HadoopProtocolTaskArguments.java | 81 + .../hadoop/shuffle/GridHadoopShuffle.java | 256 -- .../hadoop/shuffle/GridHadoopShuffleAck.java | 91 - .../hadoop/shuffle/GridHadoopShuffleJob.java | 593 ----- .../shuffle/GridHadoopShuffleMessage.java | 242 -- .../hadoop/shuffle/HadoopShuffle.java | 256 ++ .../hadoop/shuffle/HadoopShuffleAck.java | 91 + .../hadoop/shuffle/HadoopShuffleJob.java | 593 +++++ .../hadoop/shuffle/HadoopShuffleMessage.java | 241 ++ .../GridHadoopConcurrentHashMultimap.java | 611 ----- .../collections/GridHadoopHashMultimap.java | 174 -- .../collections/GridHadoopHashMultimapBase.java | 208 -- .../shuffle/collections/GridHadoopMultimap.java | 112 - .../collections/GridHadoopMultimapBase.java | 368 --- .../shuffle/collections/GridHadoopSkipList.java | 726 ------ .../HadoopConcurrentHashMultimap.java | 611 +++++ .../shuffle/collections/HadoopHashMultimap.java | 174 ++ .../collections/HadoopHashMultimapBase.java | 208 ++ .../shuffle/collections/HadoopMultimap.java | 112 + .../shuffle/collections/HadoopMultimapBase.java | 368 +++ .../shuffle/collections/HadoopSkipList.java | 726 ++++++ .../shuffle/streams/GridHadoopDataInStream.java | 170 -- .../streams/GridHadoopDataOutStream.java | 131 - .../streams/GridHadoopOffheapBuffer.java | 122 - .../shuffle/streams/HadoopDataInStream.java | 170 ++ .../shuffle/streams/HadoopDataOutStream.java | 131 + .../shuffle/streams/HadoopOffheapBuffer.java | 122 + .../GridHadoopEmbeddedTaskExecutor.java | 146 -- .../taskexecutor/GridHadoopExecutorService.java | 232 -- .../taskexecutor/GridHadoopRunnableTask.java | 268 -- .../GridHadoopTaskExecutorAdapter.java | 57 - .../taskexecutor/GridHadoopTaskState.java | 38 - .../taskexecutor/GridHadoopTaskStatus.java | 114 - .../HadoopEmbeddedTaskExecutor.java | 146 ++ .../taskexecutor/HadoopExecutorService.java | 231 ++ .../hadoop/taskexecutor/HadoopRunnableTask.java | 268 ++ .../taskexecutor/HadoopTaskExecutorAdapter.java | 57 + .../hadoop/taskexecutor/HadoopTaskState.java | 38 + .../hadoop/taskexecutor/HadoopTaskStatus.java | 114 + .../GridHadoopExternalTaskExecutor.java | 960 ------- .../GridHadoopExternalTaskMetadata.java | 68 - .../GridHadoopJobInfoUpdateRequest.java | 109 - .../GridHadoopPrepareForJobRequest.java | 126 - .../external/GridHadoopProcessDescriptor.java | 150 -- .../external/GridHadoopProcessStartedAck.java | 46 - .../GridHadoopTaskExecutionRequest.java | 110 - .../external/GridHadoopTaskFinishedMessage.java | 92 - .../external/HadoopExternalTaskExecutor.java | 960 +++++++ .../external/HadoopExternalTaskMetadata.java | 68 + .../external/HadoopJobInfoUpdateRequest.java | 109 + .../external/HadoopPrepareForJobRequest.java | 126 + .../external/HadoopProcessDescriptor.java | 150 ++ .../external/HadoopProcessStartedAck.java | 46 + .../external/HadoopTaskExecutionRequest.java | 110 + .../external/HadoopTaskFinishedMessage.java | 92 + .../child/GridHadoopChildProcessRunner.java | 440 ---- .../child/GridHadoopExternalProcessStarter.java | 296 --- .../child/HadoopChildProcessRunner.java | 440 ++++ .../child/HadoopExternalProcessStarter.java | 296 +++ .../GridHadoopAbstractCommunicationClient.java | 96 - .../GridHadoopCommunicationClient.java | 72 - .../GridHadoopExternalCommunication.java | 1431 ----------- .../GridHadoopHandshakeTimeoutException.java | 42 - .../GridHadoopIpcToNioAdapter.java | 239 -- .../GridHadoopMarshallerFilter.java | 84 - .../GridHadoopMessageListener.java | 39 - .../GridHadoopTcpNioCommunicationClient.java | 99 - .../HadoopAbstractCommunicationClient.java | 96 + .../HadoopCommunicationClient.java | 72 + .../HadoopExternalCommunication.java | 1431 +++++++++++ .../HadoopHandshakeTimeoutException.java | 42 + .../communication/HadoopIpcToNioAdapter.java | 239 ++ .../communication/HadoopMarshallerFilter.java | 84 + .../communication/HadoopMessageListener.java | 39 + .../HadoopTcpNioCommunicationClient.java | 99 + .../hadoop/v1/GridHadoopV1CleanupTask.java | 62 - .../hadoop/v1/GridHadoopV1Counter.java | 105 - .../hadoop/v1/GridHadoopV1MapTask.java | 111 - .../hadoop/v1/GridHadoopV1OutputCollector.java | 130 - .../hadoop/v1/GridHadoopV1Partitioner.java | 44 - .../hadoop/v1/GridHadoopV1ReduceTask.java | 92 - .../hadoop/v1/GridHadoopV1Reporter.java | 79 - .../hadoop/v1/GridHadoopV1SetupTask.java | 56 - .../hadoop/v1/GridHadoopV1Splitter.java | 97 - .../processors/hadoop/v1/GridHadoopV1Task.java | 95 - .../hadoop/v1/HadoopV1CleanupTask.java | 62 + .../processors/hadoop/v1/HadoopV1Counter.java | 105 + .../processors/hadoop/v1/HadoopV1MapTask.java | 111 + .../hadoop/v1/HadoopV1OutputCollector.java | 130 + .../hadoop/v1/HadoopV1Partitioner.java | 44 + .../hadoop/v1/HadoopV1ReduceTask.java | 92 + .../processors/hadoop/v1/HadoopV1Reporter.java | 79 + .../processors/hadoop/v1/HadoopV1SetupTask.java | 56 + .../processors/hadoop/v1/HadoopV1Splitter.java | 97 + .../processors/hadoop/v1/HadoopV1Task.java | 95 + .../hadoop/v2/GridHadoopExternalSplit.java | 87 - .../hadoop/v2/GridHadoopNativeCodeLoader.java | 74 - .../v2/GridHadoopSerializationWrapper.java | 133 - .../v2/GridHadoopShutdownHookManager.java | 96 - .../hadoop/v2/GridHadoopSplitWrapper.java | 118 - .../hadoop/v2/GridHadoopV2CleanupTask.java | 73 - .../hadoop/v2/GridHadoopV2Context.java | 230 -- .../hadoop/v2/GridHadoopV2Counter.java | 87 - .../processors/hadoop/v2/GridHadoopV2Job.java | 280 --- .../v2/GridHadoopV2JobResourceManager.java | 305 --- .../hadoop/v2/GridHadoopV2MapTask.java | 109 - .../hadoop/v2/GridHadoopV2Partitioner.java | 44 - .../hadoop/v2/GridHadoopV2ReduceTask.java | 88 - .../hadoop/v2/GridHadoopV2SetupTask.java | 66 - .../hadoop/v2/GridHadoopV2Splitter.java | 105 - .../processors/hadoop/v2/GridHadoopV2Task.java | 181 -- .../hadoop/v2/GridHadoopV2TaskContext.java | 443 ---- .../v2/GridHadoopWritableSerialization.java | 74 - .../hadoop/v2/HadoopExternalSplit.java | 87 + .../hadoop/v2/HadoopNativeCodeLoader.java | 74 + .../hadoop/v2/HadoopSerializationWrapper.java | 133 + .../hadoop/v2/HadoopShutdownHookManager.java | 96 + .../hadoop/v2/HadoopSplitWrapper.java | 118 + .../hadoop/v2/HadoopV2CleanupTask.java | 73 + .../processors/hadoop/v2/HadoopV2Context.java | 230 ++ .../processors/hadoop/v2/HadoopV2Counter.java | 87 + .../processors/hadoop/v2/HadoopV2Job.java | 280 +++ .../hadoop/v2/HadoopV2JobResourceManager.java | 305 +++ .../processors/hadoop/v2/HadoopV2MapTask.java | 109 + .../hadoop/v2/HadoopV2Partitioner.java | 44 + .../hadoop/v2/HadoopV2ReduceTask.java | 88 + .../processors/hadoop/v2/HadoopV2SetupTask.java | 66 + .../processors/hadoop/v2/HadoopV2Splitter.java | 105 + .../processors/hadoop/v2/HadoopV2Task.java | 181 ++ .../hadoop/v2/HadoopV2TaskContext.java | 444 ++++ .../hadoop/v2/HadoopWritableSerialization.java | 74 + ...op.mapreduce.protocol.ClientProtocolProvider | 2 +- ...ridHadoopClientProtocolEmbeddedSelfTest.java | 34 - .../GridHadoopClientProtocolSelfTest.java | 633 ----- .../HadoopClientProtocolEmbeddedSelfTest.java | 34 + .../hadoop/HadoopClientProtocolSelfTest.java | 635 +++++ .../HadoopIgfs20FileSystemAbstractSelfTest.java | 1967 +++++++++++++++ ...Igfs20FileSystemLoopbackPrimarySelfTest.java | 74 + ...oopIgfs20FileSystemShmemPrimarySelfTest.java | 74 + .../igfs/HadoopIgfsDualAbstractSelfTest.java | 305 +++ .../igfs/HadoopIgfsDualAsyncSelfTest.java | 32 + .../ignite/igfs/HadoopIgfsDualSyncSelfTest.java | 32 + ...oopSecondaryFileSystemConfigurationTest.java | 44 +- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 54 +- .../IgfsHadoop20FileSystemAbstractSelfTest.java | 1967 --------------- ...doop20FileSystemLoopbackPrimarySelfTest.java | 74 - ...sHadoop20FileSystemShmemPrimarySelfTest.java | 74 - .../igfs/IgfsHadoopDualAbstractSelfTest.java | 304 --- .../igfs/IgfsHadoopDualAsyncSelfTest.java | 32 - .../ignite/igfs/IgfsHadoopDualSyncSelfTest.java | 32 - .../IgfsHadoopFileSystemAbstractSelfTest.java | 2366 ------------------ .../IgfsHadoopFileSystemClientSelfTest.java | 199 -- .../IgfsHadoopFileSystemHandshakeSelfTest.java | 311 --- .../IgfsHadoopFileSystemIpcCacheSelfTest.java | 207 -- .../IgfsHadoopFileSystemLoggerSelfTest.java | 287 --- ...IgfsHadoopFileSystemLoggerStateSelfTest.java | 325 --- ...adoopFileSystemLoopbackAbstractSelfTest.java | 46 - ...SystemLoopbackEmbeddedDualAsyncSelfTest.java | 33 - ...eSystemLoopbackEmbeddedDualSyncSelfTest.java | 33 - ...leSystemLoopbackEmbeddedPrimarySelfTest.java | 33 - ...SystemLoopbackEmbeddedSecondarySelfTest.java | 34 - ...SystemLoopbackExternalDualAsyncSelfTest.java | 33 - ...eSystemLoopbackExternalDualSyncSelfTest.java | 33 - ...leSystemLoopbackExternalPrimarySelfTest.java | 33 - ...SystemLoopbackExternalSecondarySelfTest.java | 34 - ...fsHadoopFileSystemSecondaryModeSelfTest.java | 319 --- ...fsHadoopFileSystemShmemAbstractSelfTest.java | 88 - ...ileSystemShmemEmbeddedDualAsyncSelfTest.java | 33 - ...FileSystemShmemEmbeddedDualSyncSelfTest.java | 33 - ...pFileSystemShmemEmbeddedPrimarySelfTest.java | 33 - ...ileSystemShmemEmbeddedSecondarySelfTest.java | 33 - ...ileSystemShmemExternalDualAsyncSelfTest.java | 33 - ...FileSystemShmemExternalDualSyncSelfTest.java | 33 - ...pFileSystemShmemExternalPrimarySelfTest.java | 33 - ...ileSystemShmemExternalSecondarySelfTest.java | 33 - .../igfs/IgfsNearOnlyMultiNodeSelfTest.java | 4 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 2366 ++++++++++++++++++ .../IgniteHadoopFileSystemClientSelfTest.java | 199 ++ ...IgniteHadoopFileSystemHandshakeSelfTest.java | 311 +++ .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 207 ++ .../IgniteHadoopFileSystemLoggerSelfTest.java | 287 +++ ...niteHadoopFileSystemLoggerStateSelfTest.java | 325 +++ ...adoopFileSystemLoopbackAbstractSelfTest.java | 46 + ...SystemLoopbackEmbeddedDualAsyncSelfTest.java | 33 + ...eSystemLoopbackEmbeddedDualSyncSelfTest.java | 33 + ...leSystemLoopbackEmbeddedPrimarySelfTest.java | 33 + ...SystemLoopbackEmbeddedSecondarySelfTest.java | 34 + ...SystemLoopbackExternalDualAsyncSelfTest.java | 33 + ...eSystemLoopbackExternalDualSyncSelfTest.java | 33 + ...leSystemLoopbackExternalPrimarySelfTest.java | 33 + ...SystemLoopbackExternalSecondarySelfTest.java | 34 + ...teHadoopFileSystemSecondaryModeSelfTest.java | 319 +++ ...teHadoopFileSystemShmemAbstractSelfTest.java | 88 + ...ileSystemShmemEmbeddedDualAsyncSelfTest.java | 33 + ...FileSystemShmemEmbeddedDualSyncSelfTest.java | 33 + ...pFileSystemShmemEmbeddedPrimarySelfTest.java | 33 + ...ileSystemShmemEmbeddedSecondarySelfTest.java | 33 + ...ileSystemShmemExternalDualAsyncSelfTest.java | 33 + ...FileSystemShmemExternalDualSyncSelfTest.java | 33 + ...pFileSystemShmemExternalPrimarySelfTest.java | 33 + ...ileSystemShmemExternalSecondarySelfTest.java | 33 + .../hadoop/GridHadoopAbstractSelfTest.java | 222 -- .../hadoop/GridHadoopAbstractWordCountTest.java | 138 - .../hadoop/GridHadoopClassLoaderTest.java | 69 - .../hadoop/GridHadoopCommandLineTest.java | 440 ---- ...idHadoopDefaultMapReducePlannerSelfTest.java | 1005 -------- .../hadoop/GridHadoopFileSystemsTest.java | 177 -- .../hadoop/GridHadoopGroupingTest.java | 286 --- .../hadoop/GridHadoopJobTrackerSelfTest.java | 330 --- .../GridHadoopMapReduceEmbeddedSelfTest.java | 245 -- .../hadoop/GridHadoopMapReduceTest.java | 195 -- .../hadoop/GridHadoopPopularWordsTest.java | 294 --- .../GridHadoopSerializationWrapperSelfTest.java | 74 - .../processors/hadoop/GridHadoopSharedMap.java | 67 - .../hadoop/GridHadoopSortingExternalTest.java | 44 - .../hadoop/GridHadoopSortingTest.java | 281 --- .../hadoop/GridHadoopSplitWrapperSelfTest.java | 68 - .../processors/hadoop/GridHadoopStartup.java | 55 - .../hadoop/GridHadoopTaskExecutionSelfTest.java | 551 ---- .../hadoop/GridHadoopTasksAllVersionsTest.java | 259 -- .../hadoop/GridHadoopTasksV1Test.java | 57 - .../hadoop/GridHadoopTasksV2Test.java | 75 - .../GridHadoopTestRoundRobinMrPlanner.java | 66 - .../hadoop/GridHadoopTestTaskContext.java | 219 -- .../processors/hadoop/GridHadoopTestUtils.java | 102 - .../hadoop/GridHadoopV2JobSelfTest.java | 88 - .../hadoop/GridHadoopValidationSelfTest.java | 53 - .../hadoop/HadoopAbstractSelfTest.java | 222 ++ .../hadoop/HadoopAbstractWordCountTest.java | 138 + .../hadoop/HadoopClassLoaderTest.java | 69 + .../hadoop/HadoopCommandLineTest.java | 440 ++++ .../HadoopDefaultMapReducePlannerSelfTest.java | 1006 ++++++++ .../hadoop/HadoopFileSystemsTest.java | 177 ++ .../processors/hadoop/HadoopGroupingTest.java | 287 +++ .../hadoop/HadoopJobTrackerSelfTest.java | 331 +++ .../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 246 ++ .../processors/hadoop/HadoopMapReduceTest.java | 197 ++ .../hadoop/HadoopPopularWordsTest.java | 294 +++ .../HadoopSerializationWrapperSelfTest.java | 74 + .../processors/hadoop/HadoopSharedMap.java | 67 + .../hadoop/HadoopSortingExternalTest.java | 44 + .../processors/hadoop/HadoopSortingTest.java | 282 +++ .../hadoop/HadoopSplitWrapperSelfTest.java | 68 + .../processors/hadoop/HadoopStartup.java | 54 + .../hadoop/HadoopTaskExecutionSelfTest.java | 551 ++++ .../hadoop/HadoopTasksAllVersionsTest.java | 259 ++ .../processors/hadoop/HadoopTasksV1Test.java | 57 + .../processors/hadoop/HadoopTasksV2Test.java | 75 + .../hadoop/HadoopTestRoundRobinMrPlanner.java | 66 + .../hadoop/HadoopTestTaskContext.java | 219 ++ .../processors/hadoop/HadoopTestUtils.java | 102 + .../processors/hadoop/HadoopV2JobSelfTest.java | 88 + .../hadoop/HadoopValidationSelfTest.java | 53 + .../hadoop/examples/GridHadoopWordCount1.java | 88 - .../examples/GridHadoopWordCount1Map.java | 62 - .../examples/GridHadoopWordCount1Reduce.java | 51 - .../hadoop/examples/GridHadoopWordCount2.java | 95 - .../examples/GridHadoopWordCount2Mapper.java | 72 - .../examples/GridHadoopWordCount2Reducer.java | 70 - .../hadoop/examples/HadoopWordCount1.java | 88 + .../hadoop/examples/HadoopWordCount1Map.java | 62 + .../hadoop/examples/HadoopWordCount1Reduce.java | 51 + .../hadoop/examples/HadoopWordCount2.java | 95 + .../hadoop/examples/HadoopWordCount2Mapper.java | 72 + .../examples/HadoopWordCount2Reducer.java | 70 + .../collections/GridHadoopAbstractMapTest.java | 152 -- ...ridHadoopConcurrentHashMultimapSelftest.java | 267 -- .../collections/GridHadoopHashMapSelfTest.java | 170 -- .../collections/GridHadoopSkipListSelfTest.java | 303 --- .../collections/HadoopAbstractMapTest.java | 154 ++ .../HadoopConcurrentHashMultimapSelftest.java | 267 ++ .../collections/HadoopHashMapSelfTest.java | 170 ++ .../collections/HadoopSkipListSelfTest.java | 303 +++ .../streams/GridHadoopDataStreamSelfTest.java | 151 -- .../streams/HadoopDataStreamSelfTest.java | 151 ++ .../GridHadoopExecutorServiceTest.java | 119 - .../taskexecutor/HadoopExecutorServiceTest.java | 119 + ...GridHadoopExternalTaskExecutionSelfTest.java | 221 -- .../HadoopExternalTaskExecutionSelfTest.java | 221 ++ ...GridHadoopExternalCommunicationSelfTest.java | 209 -- .../HadoopExternalCommunicationSelfTest.java | 209 ++ .../testsuites/IgniteHadoopTestSuite.java | 82 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 22 +- .../parser/dialect/JdbcMetadataDialect.java | 95 +- .../parser/dialect/OracleMetadataDialect.java | 66 +- .../org/apache/ignite/schema/ui/MessageBox.java | 7 + .../org/apache/ignite/IgniteSpringBean.java | 4 +- .../scala/org/apache/ignite/visor/visor.scala | 4 +- pom.xml | 12 +- 587 files changed, 48204 insertions(+), 47810 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b46d0a2,81bbf89..35fee7c --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@@ -678,9 -671,9 +673,11 @@@ public class IgniteKernal implements Ig igfsExecSvc, restExecSvc); + cfg.getMarshaller().setContext(ctx.marshallerContext()); + - startProcessor(ctx, new ClusterProcessor(ctx), attrs); + startProcessor(new ClusterProcessor(ctx)); + + fillNodeAttributes(); U.onGridStart(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index dd08eb0,e76c922..62c651e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -902,12 -902,32 +902,38 @@@ public class GridCacheProcessor extend transactions = new IgniteTransactionsImpl(sharedCtx); + if (!(ctx.isDaemon() || F.isEmpty(ctx.config().getCacheConfiguration()))) { + GridCacheAttributes[] attrVals = new GridCacheAttributes[ctx.config().getCacheConfiguration().length]; + + Map<String, String> interceptors = new HashMap<>(); + + int i = 0; + + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + assert caches.containsKey(cfg.getName()) : cfg.getName(); + + GridCacheContext ctx = caches.get(cfg.getName()).context(); + + attrVals[i++] = new GridCacheAttributes(cfg, ctx.store().configuredStore()); + + if (cfg.getInterceptor() != null) + interceptors.put(cfg.getName(), cfg.getInterceptor().getClass().getName()); + } + + ctx.addNodeAttribute(ATTR_CACHE, attrVals); + + ctx.addNodeAttribute(ATTR_TX_CONFIG, ctx.config().getTransactionConfiguration()); + + if (!interceptors.isEmpty()) + ctx.addNodeAttribute(ATTR_CACHE_INTERCEPTORS, interceptors); + } + + marshallerCache().context().preloader().syncFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + ctx.marshallerContext().onMarshallerCacheReady(ctx); + } + }); + if (log.isDebugEnabled()) log.debug("Started cache processor."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java index 0000000,83053ce..5c52f94 mode 000000,100644..100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java @@@ -1,0 -1,138 +1,138 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.hadoop.mapreduce; + + import org.apache.hadoop.conf.*; + import org.apache.hadoop.mapreduce.*; + import org.apache.hadoop.mapreduce.protocol.*; + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.marshaller.optimized.*; ++import org.apache.ignite.internal.client.marshaller.jdk.*; + import org.apache.ignite.internal.processors.hadoop.proto.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.typedef.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.internal.client.GridClientProtocol.*; + import static org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol.*; + + + /** + * Ignite Hadoop client protocol provider. + */ + public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { + /** Clients. */ + private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public ClientProtocol create(Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { + String addr = conf.get(MRConfig.MASTER_ADDRESS); + + if (F.isEmpty(addr)) + throw new IOException("Failed to create client protocol because server address is not specified (is " + + MRConfig.MASTER_ADDRESS + " property set?)."); + + if (F.eq(addr, "local")) + throw new IOException("Local execution mode is not supported, please point " + + MRConfig.MASTER_ADDRESS + " to real Ignite node."); + + return createProtocol(addr, conf); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) + return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); + + return null; + } + + /** {@inheritDoc} */ + @Override public void close(ClientProtocol cliProto) throws IOException { + // No-op. + } + + /** + * Internal protocol creation routine. + * + * @param addr Address. + * @param conf Configuration. + * @return Client protocol. + * @throws IOException If failed. + */ + private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { + return new HadoopClientProtocol(conf, client(addr)); + } + + /** + * Create client. + * + * @param addr Endpoint address. + * @return Client. + * @throws IOException If failed. + */ + private static GridClient client(String addr) throws IOException { + try { + IgniteInternalFuture<GridClient> fut = cliMap.get(addr); + + if (fut == null) { + GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); + + IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); + + if (oldFut != null) + return oldFut.get(); + else { + GridClientConfiguration cliCfg = new GridClientConfiguration(); + + cliCfg.setProtocol(TCP); + cliCfg.setServers(Collections.singletonList(addr)); - cliCfg.setMarshaller(new GridClientOptimizedMarshaller()); ++ cliCfg.setMarshaller(new GridClientJdkMarshaller()); + cliCfg.setDaemon(true); + + try { + GridClient cli = GridClientFactory.start(cliCfg); + + fut0.onDone(cli); + + return cli; + } + catch (GridClientException e) { + fut0.onDone(e); + + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } + } + else + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java index 0000000,3a94d43..a449bfd mode 000000,100644..100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java @@@ -1,0 -1,296 +1,296 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; + + import org.apache.ignite.*; + import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; + import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.logger.log4j.*; -import org.apache.ignite.marshaller.optimized.*; ++import org.apache.ignite.marshaller.jdk.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + + /** + * Hadoop external process base class. + */ + public class HadoopExternalProcessStarter { + /** Path to Log4j configuration file. */ + public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml"; + + /** Arguments. */ + private Args args; + + /** System out. */ + private OutputStream out; + + /** System err. */ + private OutputStream err; + + /** + * @param args Parsed arguments. + */ + public HadoopExternalProcessStarter(Args args) { + this.args = args; + } + + /** + * @param cmdArgs Process arguments. + */ + public static void main(String[] cmdArgs) { + try { + Args args = arguments(cmdArgs); + + new HadoopExternalProcessStarter(args).run(); + } + catch (Exception e) { + System.err.println("Failed"); + + System.err.println(e.getMessage()); + + e.printStackTrace(System.err); + } + } + + /** + * + * @throws Exception + */ + public void run() throws Exception { + U.setWorkDirectory(args.workDir, U.getIgniteHome()); + + File outputDir = outputDirectory(); + + initializeStreams(outputDir); + + ExecutorService msgExecSvc = Executors.newFixedThreadPool( + Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2)); + + IgniteLogger log = logger(outputDir); + + HadoopExternalCommunication comm = new HadoopExternalCommunication( + args.nodeId, + args.childProcId, - new OptimizedMarshaller(), ++ new JdkMarshaller(), + log, + msgExecSvc, + "external" + ); + + comm.start(); + + HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId); + nodeDesc.address(args.addr); + nodeDesc.tcpPort(args.tcpPort); + nodeDesc.sharedMemoryPort(args.shmemPort); + + HadoopChildProcessRunner runner = new HadoopChildProcessRunner(); + + runner.start(comm, nodeDesc, msgExecSvc, log); + + System.err.println("Started"); + System.err.flush(); + + System.setOut(new PrintStream(out)); + System.setErr(new PrintStream(err)); + } + + /** + * @param outputDir Directory for process output. + * @throws Exception + */ + private void initializeStreams(File outputDir) throws Exception { + out = new FileOutputStream(new File(outputDir, args.childProcId + ".out")); + err = new FileOutputStream(new File(outputDir, args.childProcId + ".err")); + } + + /** + * @return Path to output directory. + * @throws IOException If failed. + */ + private File outputDirectory() throws IOException { + File f = new File(args.out); + + if (!f.exists()) { + if (!f.mkdirs()) + throw new IOException("Failed to create output directory: " + args.out); + } + else { + if (f.isFile()) + throw new IOException("Output directory is a file: " + args.out); + } + + return f; + } + + /** + * @param outputDir Directory for process output. + * @return Logger. + */ + private IgniteLogger logger(final File outputDir) { + final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG); + + Log4JLogger logger; + + try { + logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true); + } + catch (IgniteCheckedException e) { + System.err.println("Failed to create URL-based logger. Will use default one."); + + e.printStackTrace(); + + logger = new Log4JLogger(true); + } + + logger.updateFilePath(new IgniteClosure<String, String>() { + @Override public String apply(String s) { + return new File(outputDir, args.childProcId + ".log").getAbsolutePath(); + } + }); + + return logger; + } + + /** + * @param processArgs Process arguments. + * @return Child process instance. + */ + private static Args arguments(String[] processArgs) throws Exception { + Args args = new Args(); + + for (int i = 0; i < processArgs.length; i++) { + String arg = processArgs[i]; + + switch (arg) { + case "-cpid": { + if (i == processArgs.length - 1) + throw new Exception("Missing process ID for '-cpid' parameter"); + + String procIdStr = processArgs[++i]; + + args.childProcId = UUID.fromString(procIdStr); + + break; + } + + case "-ppid": { + if (i == processArgs.length - 1) + throw new Exception("Missing process ID for '-ppid' parameter"); + + String procIdStr = processArgs[++i]; + + args.parentProcId = UUID.fromString(procIdStr); + + break; + } + + case "-nid": { + if (i == processArgs.length - 1) + throw new Exception("Missing node ID for '-nid' parameter"); + + String nodeIdStr = processArgs[++i]; + + args.nodeId = UUID.fromString(nodeIdStr); + + break; + } + + case "-addr": { + if (i == processArgs.length - 1) + throw new Exception("Missing node address for '-addr' parameter"); + + args.addr = processArgs[++i]; + + break; + } + + case "-tport": { + if (i == processArgs.length - 1) + throw new Exception("Missing tcp port for '-tport' parameter"); + + args.tcpPort = Integer.parseInt(processArgs[++i]); + + break; + } + + case "-sport": { + if (i == processArgs.length - 1) + throw new Exception("Missing shared memory port for '-sport' parameter"); + + args.shmemPort = Integer.parseInt(processArgs[++i]); + + break; + } + + case "-out": { + if (i == processArgs.length - 1) + throw new Exception("Missing output folder name for '-out' parameter"); + + args.out = processArgs[++i]; + + break; + } + + case "-wd": { + if (i == processArgs.length - 1) + throw new Exception("Missing work folder name for '-wd' parameter"); + + args.workDir = processArgs[++i]; + + break; + } + } + } + + return args; + } + + /** + * Execution arguments. + */ + private static class Args { + /** Process ID. */ + private UUID childProcId; + + /** Process ID. */ + private UUID parentProcId; + + /** Process ID. */ + private UUID nodeId; + + /** Node address. */ + private String addr; + + /** TCP port */ + private int tcpPort; + + /** Shmem port. */ + private int shmemPort = -1; + + /** Output folder. */ + private String out; + + /** Work directory. */ + private String workDir; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java index 0000000,772e77d..722c36a mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java @@@ -1,0 -1,34 +1,44 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop; + + import org.apache.ignite.configuration.*; ++import org.apache.ignite.marshaller.jdk.*; + + /** + * External test for sorting. + */ + public class HadoopSortingExternalTest extends HadoopSortingTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } ++ ++ /** {@inheritDoc} */ ++ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { ++ IgniteConfiguration cfg = super.getConfiguration(gridName); ++ ++ cfg.setMarshaller(new JdkMarshaller()); ++ ++ return cfg; ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index 0000000,59ac445..c8f42f7 mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@@ -1,0 -1,211 +1,221 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + + import org.apache.hadoop.conf.*; + import org.apache.hadoop.fs.*; + import org.apache.hadoop.io.*; + import org.apache.hadoop.mapreduce.*; + import org.apache.hadoop.mapreduce.lib.input.*; + import org.apache.hadoop.mapreduce.lib.output.*; + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.igfs.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.hadoop.*; + import org.apache.ignite.internal.util.typedef.*; ++import org.apache.ignite.marshaller.jdk.*; + + import java.io.*; + import java.util.*; + + import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + + /** + * Job tracker self test. + */ + public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } + ++ /** {@inheritDoc} */ ++ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { ++ IgniteConfiguration cfg = super.getConfiguration(gridName); ++ ++ cfg.setMarshaller(new JdkMarshaller()); ++ ++ return cfg; ++ } ++ + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestFailingMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + IOException exp = X.cause(e, IOException.class); + + assertNotNull(exp); + assertEquals("Test failure", exp.getMessage()); + } + } + + /** + * @param filePath File path to prepare. + * @throws Exception If failed. + */ + private void prepareTestFile(String filePath) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(igfsName); + + try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { + PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); + + for (int i = 0; i < 1000; i++) + wr.println("Hello, world: " + i); + + wr.flush(); + } + } + + /** + * + */ + private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** One constant. */ + private IntWritable one = new IntWritable(1); + + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + ctx.write(line, one); + } + } + + /** + * Failing mapper. + */ + private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> { + @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { + throw new IOException("Test failure"); + } + } + + /** + * + */ + private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) + throws IOException, InterruptedException { + int s = 0; + + for (IntWritable val : values) + s += val.get(); + + System.out.println(">>>> Reduced: " + s); + + ctx.write(line, new IntWritable(s)); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1e83a269/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java index 0000000,a21633d..45fb3db mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java @@@ -1,0 -1,209 +1,209 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + + import org.apache.ignite.*; + import org.apache.ignite.internal.processors.hadoop.message.*; + import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; ++import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.testframework.junits.common.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + + /** + * Tests Hadoop external communication component. + */ + public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingTcp() throws Exception { + checkSimpleMessageSending(false); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingShmem() throws Exception { + checkSimpleMessageSending(true); + } + + /** + * @throws Exception If failed. + */ + private void checkSimpleMessageSending(boolean useShmem) throws Exception { + UUID parentNodeId = UUID.randomUUID(); + - Marshaller marsh = new OptimizedMarshaller(); ++ Marshaller marsh = new JdkMarshaller(); + + IgniteLogger log = log(); + + HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4]; + + try { + String name = "grid"; + + TestHadoopListener[] lsnrs = new TestHadoopListener[4]; + + int msgs = 10; + + for (int i = 0; i < comms.length; i++) { + comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, + Executors.newFixedThreadPool(1), name + i); + + if (useShmem) + comms[i].setSharedMemoryPort(14000); + + lsnrs[i] = new TestHadoopListener(msgs); + + comms[i].setListener(lsnrs[i]); + + comms[i].start(); + } + + for (int r = 0; r < msgs; r++) { + for (int from = 0; from < comms.length; from++) { + for (int to = 0; to < comms.length; to++) { + if (from == to) + continue; + + comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); + } + } + } + + U.sleep(1000); + + for (TestHadoopListener lsnr : lsnrs) { + lsnr.await(3_000); + + assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); + } + } + finally { + for (HadoopExternalCommunication comm : comms) { + if (comm != null) + comm.stop(); + } + } + } + + /** + * + */ + private static class TestHadoopListener implements HadoopMessageListener { + /** Received messages (array list is safe because executor has one thread). */ + private Collection<TestMessage> msgs = new ArrayList<>(); + + /** Await latch. */ + private CountDownLatch receiveLatch; + + /** + * @param msgs Number of messages to await. + */ + private TestHadoopListener(int msgs) { + receiveLatch = new CountDownLatch(msgs); + } + + /** {@inheritDoc} */ + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { + assert msg instanceof TestMessage; + + msgs.add((TestMessage)msg); + + receiveLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + // No-op. + } + + /** + * @return Received messages. + */ + public Collection<TestMessage> messages() { + return msgs; + } + + /** + * @param millis Time to await. + * @throws InterruptedException If wait interrupted. + */ + public void await(int millis) throws InterruptedException { + receiveLatch.await(millis, TimeUnit.MILLISECONDS); + } + } + + /** + * + */ + private static class TestMessage implements HadoopMessage { + /** From index. */ + private int from; + + /** To index. */ + private int to; + + /** + * @param from From index. + * @param to To index. + */ + private TestMessage(int from, int to) { + this.from = from; + this.to = to; + } + + /** + * Required by {@link Externalizable}. + */ + public TestMessage() { + // No-op. + } + + /** + * @return From index. + */ + public int from() { + return from; + } + + /** + * @return To index. + */ + public int to() { + return to; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(from); + out.writeInt(to); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + from = in.readInt(); + to = in.readInt(); + } + } + }