Merge branch 'sprint-2' into ignite-394 Conflicts: modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de65eb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de65eb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de65eb0 Branch: refs/heads/sprint-2 Commit: 1de65eb06834f2df87b373bdd9e55371bb5bcbac Parents: 1f2b802 40b96e1 Author: Artem Shutak <ashu...@gridgain.com> Authored: Tue Mar 10 13:05:10 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Tue Mar 10 13:05:10 2015 +0300 ---------------------------------------------------------------------- DISCLAIMER.txt | 15 + README.txt | 25 + RELEASE_NOTES.txt | 18 + assembly/release-base.xml | 20 +- assembly/release-fabric.xml | 5 - bin/ignitevisorcmd.bat | 7 +- bin/ignitevisorcmd.sh | 7 +- bin/include/target-classpath.bat | 2 +- bin/include/target-classpath.sh | 2 +- docs/ignite_readme.md | 100 -- docs/ignite_readme.pdf | Bin 77136 -> 0 bytes docs/release_notes.md | 16 - docs/release_notes.pdf | Bin 33174 -> 0 bytes docs/wiki/basic-concepts/async-support.md | 75 - docs/wiki/basic-concepts/getting-started.md | 218 --- docs/wiki/basic-concepts/ignite-life-cycel.md | 105 -- docs/wiki/basic-concepts/maven-setup.md | 68 - docs/wiki/basic-concepts/what-is-ignite.md | 31 - docs/wiki/basic-concepts/zero-deployment.md | 56 - docs/wiki/clustering/aws-config.md | 42 - docs/wiki/clustering/cluster-config.md | 176 -- docs/wiki/clustering/cluster-groups.md | 210 --- docs/wiki/clustering/cluster.md | 128 -- docs/wiki/clustering/leader-election.md | 59 - docs/wiki/clustering/network-config.md | 101 -- docs/wiki/clustering/node-local-map.md | 35 - docs/wiki/compute-grid/checkpointing.md | 238 --- .../compute-grid/collocate-compute-and-data.md | 29 - docs/wiki/compute-grid/compute-grid.md | 56 - docs/wiki/compute-grid/compute-tasks.md | 105 -- docs/wiki/compute-grid/distributed-closures.md | 107 -- docs/wiki/compute-grid/executor-service.md | 23 - docs/wiki/compute-grid/fault-tolerance.md | 79 - docs/wiki/compute-grid/job-scheduling.md | 69 - docs/wiki/compute-grid/load-balancing.md | 59 - docs/wiki/data-grid/affinity-collocation.md | 78 - docs/wiki/data-grid/automatic-db-integration.md | 102 -- docs/wiki/data-grid/cache-modes.md | 237 --- docs/wiki/data-grid/cache-queries.md | 164 -- docs/wiki/data-grid/data-grid.md | 68 - docs/wiki/data-grid/data-loading.md | 77 - docs/wiki/data-grid/evictions.md | 86 - docs/wiki/data-grid/hibernate-l2-cache.md | 173 -- docs/wiki/data-grid/jcache.md | 99 -- docs/wiki/data-grid/off-heap-memory.md | 180 -- docs/wiki/data-grid/persistent-store.md | 111 -- docs/wiki/data-grid/rebalancing.md | 105 -- docs/wiki/data-grid/transactions.md | 127 -- docs/wiki/data-grid/web-session-clustering.md | 236 --- .../distributed-data-structures/atomic-types.md | 97 - .../countdownlatch.md | 24 - .../distributed-data-structures/id-generator.md | 40 - .../queue-and-set.md | 116 -- .../distributed-events/automatic-batching.md | 16 - docs/wiki/distributed-events/events.md | 101 -- docs/wiki/distributed-file-system/igfs.md | 1 - docs/wiki/distributed-messaging/messaging.md | 73 - docs/wiki/http/configuration.md | 58 - docs/wiki/http/rest-api.md | 1646 ----------------- docs/wiki/release-notes/release-notes.md | 13 - docs/wiki/service-grid/cluster-singletons.md | 94 - docs/wiki/service-grid/service-configuration.md | 33 - docs/wiki/service-grid/service-example.md | 94 - docs/wiki/service-grid/service-grid.md | 62 - .../ComputeFibonacciContinuationExample.java | 12 +- .../examples/datagrid/CacheApiExample.java | 2 +- .../examples/ScalarContinuationExample.scala | 12 +- .../client/suite/IgniteClientTestSuite.java | 3 +- .../org/apache/ignite/IgniteFileSystem.java | 24 +- .../apache/ignite/IgniteSystemProperties.java | 35 +- .../org/apache/ignite/cache/CacheManager.java | 1 - .../igfs/IgfsDirectoryNotEmptyException.java | 42 + .../ignite/igfs/IgfsFileNotFoundException.java | 44 - .../ignite/igfs/IgfsPathNotFoundException.java | 44 + .../igfs/secondary/IgfsSecondaryFileSystem.java | 10 +- .../internal/ComputeTaskInternalFuture.java | 44 +- .../ignite/internal/GridJobContextImpl.java | 6 +- .../ignite/internal/GridJobSessionImpl.java | 2 +- .../ignite/internal/GridKernalGatewayImpl.java | 26 - .../ignite/internal/GridTaskSessionImpl.java | 2 +- .../ignite/internal/IgniteInternalFuture.java | 79 +- .../apache/ignite/internal/IgniteKernal.java | 24 +- .../internal/client/GridClientFuture.java | 9 +- .../client/impl/GridClientDataImpl.java | 2 +- .../client/impl/GridClientFutureAdapter.java | 26 +- .../connection/GridClientNioTcpConnection.java | 5 +- .../impl/GridTcpRouterNioListenerAdapter.java | 2 +- .../internal/cluster/IgniteClusterImpl.java | 9 +- .../internal/executor/GridExecutorService.java | 2 +- .../igfs/common/IgfsControlResponse.java | 5 +- .../managers/communication/GridIoManager.java | 43 +- .../discovery/GridDiscoveryManager.java | 10 +- .../eventstorage/GridEventStorageManager.java | 6 +- .../affinity/GridAffinityAssignmentCache.java | 17 +- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/cache/GridCacheAdapter.java | 91 +- .../cache/GridCacheAffinityManager.java | 2 +- .../cache/GridCacheDeploymentManager.java | 2 +- .../cache/GridCacheEvictionManager.java | 27 +- .../processors/cache/GridCacheGateway.java | 6 + .../processors/cache/GridCacheIoManager.java | 77 +- .../processors/cache/GridCacheMapEntry.java | 14 +- .../processors/cache/GridCacheMessage.java | 7 - .../cache/GridCacheMultiTxFuture.java | 54 +- .../processors/cache/GridCacheMvcc.java | 3 +- .../processors/cache/GridCacheMvccManager.java | 34 +- .../GridCachePartitionExchangeManager.java | 6 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../cache/GridCacheProjectionImpl.java | 12 +- .../cache/GridCacheSharedContext.java | 2 +- .../processors/cache/GridCacheUtils.java | 11 +- .../processors/cache/IgniteCacheProxy.java | 2 +- ...ridCacheOptimisticCheckPreparedTxFuture.java | 25 +- .../distributed/GridCacheTxFinishSync.java | 2 +- .../GridDistributedCacheAdapter.java | 4 +- .../GridDistributedTxRemoteAdapter.java | 6 +- .../dht/GridDhtAssignmentFetchFuture.java | 22 +- .../distributed/dht/GridDhtCacheAdapter.java | 9 +- .../distributed/dht/GridDhtCacheEntry.java | 14 +- .../distributed/dht/GridDhtEmbeddedFuture.java | 43 +- .../distributed/dht/GridDhtFinishedFuture.java | 22 +- .../cache/distributed/dht/GridDhtGetFuture.java | 45 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 35 +- .../dht/GridDhtTransactionalCacheAdapter.java | 35 +- .../distributed/dht/GridDhtTxFinishFuture.java | 38 +- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 16 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 48 +- .../dht/GridPartitionedGetFuture.java | 44 +- .../dht/atomic/GridDhtAtomicCache.java | 20 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 20 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 43 +- .../dht/colocated/GridDhtColocatedCache.java | 30 +- .../colocated/GridDhtColocatedLockFuture.java | 43 +- .../dht/preloader/GridDhtForceKeysFuture.java | 43 +- .../preloader/GridDhtPartitionDemandPool.java | 9 +- .../GridDhtPartitionsExchangeFuture.java | 48 +- .../dht/preloader/GridDhtPreloader.java | 22 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 6 +- .../distributed/near/GridNearGetFuture.java | 51 +- .../distributed/near/GridNearLockFuture.java | 43 +- .../near/GridNearTransactionalCache.java | 2 +- .../near/GridNearTxFinishFuture.java | 32 +- .../cache/distributed/near/GridNearTxLocal.java | 83 +- .../near/GridNearTxPrepareFuture.java | 48 +- .../processors/cache/local/GridLocalCache.java | 2 +- .../cache/local/GridLocalLockFuture.java | 23 +- .../processors/cache/local/GridLocalTx.java | 10 +- .../cache/local/GridLocalTxFuture.java | 59 +- .../local/atomic/GridLocalAtomicCache.java | 8 +- .../GridCacheDistributedFieldsQueryFuture.java | 13 +- .../query/GridCacheDistributedQueryFuture.java | 11 - .../query/GridCacheDistributedQueryManager.java | 4 +- .../query/GridCacheFieldsQueryErrorFuture.java | 53 - .../query/GridCacheLocalFieldsQueryFuture.java | 13 +- .../cache/query/GridCacheLocalQueryFuture.java | 15 +- .../cache/query/GridCacheQueryErrorFuture.java | 5 +- .../query/GridCacheQueryFutureAdapter.java | 14 +- .../cache/query/GridCacheQueryManager.java | 11 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 25 +- .../transactions/IgniteTxLocalAdapter.java | 320 ++-- .../cache/transactions/IgniteTxManager.java | 18 +- .../transactions/TransactionProxyImpl.java | 2 +- .../closure/GridClosureProcessor.java | 38 +- .../continuous/GridContinuousProcessor.java | 36 +- .../datastream/IgniteDataStreamerFuture.java | 13 +- .../datastream/IgniteDataStreamerImpl.java | 39 +- .../datastream/IgniteDataStreamerProcessor.java | 2 +- .../GridCacheAtomicSequenceImpl.java | 4 +- .../processors/hadoop/HadoopNoopProcessor.java | 2 +- .../processors/igfs/IgfsDataManager.java | 36 +- .../processors/igfs/IgfsDeleteWorker.java | 2 +- .../igfs/IgfsDirectoryNotEmptyException.java | 44 - .../internal/processors/igfs/IgfsImpl.java | 30 +- .../processors/igfs/IgfsInputStreamImpl.java | 10 +- .../processors/igfs/IgfsIpcHandler.java | 4 +- .../processors/igfs/IgfsMetaManager.java | 22 +- .../processors/igfs/IgfsOutputStreamImpl.java | 2 +- .../internal/processors/igfs/IgfsServer.java | 3 +- .../processors/job/GridJobProcessor.java | 4 +- .../processors/query/GridQueryProcessor.java | 6 +- .../processors/resource/GridResourceUtils.java | 4 +- .../processors/rest/GridRestProcessor.java | 14 +- .../handlers/cache/GridCacheCommandHandler.java | 6 +- .../cache/GridCacheQueryCommandHandler.java | 4 +- .../DataStructuresCommandHandler.java | 4 +- .../handlers/task/GridTaskCommandHandler.java | 6 +- .../top/GridTopologyCommandHandler.java | 4 +- .../version/GridVersionCommandHandler.java | 2 +- .../tcp/GridTcpMemcachedNioListener.java | 6 +- .../protocols/tcp/GridTcpRestNioListener.java | 6 +- .../service/GridServiceDeploymentFuture.java | 9 +- .../service/GridServiceProcessor.java | 10 +- .../GridStreamerStageExecutionFuture.java | 32 +- .../processors/streamer/IgniteStreamerImpl.java | 23 +- .../internal/util/GridSerializableFuture.java | 28 - .../ignite/internal/util/GridThreadLocal.java | 175 -- .../ignite/internal/util/GridThreadLocalEx.java | 210 --- .../ignite/internal/util/IgniteUtils.java | 36 +- .../util/future/GridCompoundFuture.java | 52 +- .../util/future/GridCompoundIdentityFuture.java | 18 +- .../util/future/GridEmbeddedFuture.java | 77 +- .../util/future/GridFinishedFuture.java | 158 +- .../util/future/GridFinishedFutureEx.java | 197 --- .../internal/util/future/GridFutureAdapter.java | 365 +--- .../util/future/GridFutureAdapterEx.java | 517 ------ .../util/future/GridFutureChainListener.java | 18 +- .../util/future/IgniteFinishedFutureImpl.java | 27 +- .../util/future/IgniteFinishedFutureImplEx.java | 30 - .../internal/util/future/IgniteFutureImpl.java | 31 +- .../internal/util/io/GridFilenameUtils.java | 2 +- .../ignite/internal/util/lang/GridFunc.java | 90 +- .../internal/util/lang/GridPlainFuture.java | 79 - .../util/lang/GridPlainFutureAdapter.java | 299 ---- .../util/nio/GridNioEmbeddedFuture.java | 12 +- .../util/nio/GridNioFinishedFuture.java | 77 +- .../ignite/internal/util/nio/GridNioFuture.java | 84 +- .../internal/util/nio/GridNioFutureImpl.java | 282 +-- .../ignite/internal/util/nio/GridNioServer.java | 7 +- .../util/nio/GridTcpNioCommunicationClient.java | 18 +- .../ignite/internal/util/worker/GridWorker.java | 27 - .../internal/util/worker/GridWorkerFuture.java | 20 - .../visor/cache/VisorCacheClearTask.java | 2 +- .../visor/node/VisorGridConfiguration.java | 6 +- .../org/apache/ignite/lang/IgniteFuture.java | 67 +- .../lang/IgniteFutureCancelledException.java | 3 - .../lang/IgniteFutureTimeoutException.java | 3 - .../communication/tcp/TcpCommunicationSpi.java | 32 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../igfs/IgfsFragmentizerAbstractSelfTest.java | 4 +- .../internal/GridMultipleJobsSelfTest.java | 2 +- .../GridTaskFutureImplStopGridSelfTest.java | 2 +- .../internal/GridTaskListenerSelfTest.java | 2 +- .../GridCacheAsyncOperationsLimitSelfTest.java | 3 +- ...dCacheAtomicUsersAffinityMapperSelfTest.java | 7 +- .../GridCacheFinishPartitionsSelfTest.java | 6 +- .../GridCachePartitionedLocalStoreSelfTest.java | 7 - ...chePartitionedOffHeapLocalStoreSelfTest.java | 7 - .../cache/GridCachePutAllFailoverSelfTest.java | 6 +- .../GridCacheReferenceCleanupSelfTest.java | 2 +- .../GridCacheReplicatedLocalStoreSelfTest.java | 7 - ...heReplicatedUsersAffinityMapperSelfTest.java | 7 +- ...ridCacheTxPartitionedLocalStoreSelfTest.java | 7 - .../GridCacheTxUsersAffinityMapperSelfTest.java | 7 +- .../distributed/GridCacheEventAbstractTest.java | 17 +- .../processors/igfs/IgfsAbstractSelfTest.java | 28 +- .../igfs/IgfsDualAbstractSelfTest.java | 18 +- .../cache/GridCacheCommandHandlerSelfTest.java | 42 +- .../util/future/GridCompoundFutureSelfTest.java | 30 +- .../util/future/GridEmbeddedFutureSelfTest.java | 13 +- .../util/future/GridFinishedFutureSelfTest.java | 103 -- .../util/future/GridFutureAdapterSelfTest.java | 115 +- .../future/GridFutureListenPerformanceTest.java | 22 +- .../util/future/IgniteFutureImplTest.java | 99 +- .../util/future/nio/GridNioFutureSelfTest.java | 8 +- .../lang/GridFutureListenPerformanceTest.java | 2 +- .../loadtests/colocation/GridTestMain.java | 2 +- ...GridJobExecutionLoadTestClientSemaphore.java | 2 +- ...JobExecutionSingleNodeSemaphoreLoadTest.java | 2 +- .../mergesort/GridMergeSortLoadTask.java | 2 +- .../ignite/messaging/GridMessagingSelfTest.java | 12 +- .../GridCacheStoreValueBytesTest.java | 4 +- .../ignite/testframework/GridTestUtils.java | 13 +- .../testsuites/IgniteLangSelfTestSuite.java | 1 - .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 10 +- .../processors/hadoop/igfs/HadoopIgfsEx.java | 4 +- .../hadoop/igfs/HadoopIgfsFuture.java | 4 +- .../hadoop/igfs/HadoopIgfsInProc.java | 9 +- .../hadoop/igfs/HadoopIgfsInputStream.java | 6 +- .../processors/hadoop/igfs/HadoopIgfsIo.java | 6 +- .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 5 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 50 +- .../processors/hadoop/igfs/HadoopIgfsUtils.java | 3 +- .../hadoop/jobtracker/HadoopJobTracker.java | 20 +- .../proto/HadoopProtocolJobStatusTask.java | 2 +- .../hadoop/shuffle/HadoopShuffle.java | 4 +- .../hadoop/shuffle/HadoopShuffleJob.java | 16 +- .../external/HadoopExternalTaskExecutor.java | 28 +- .../child/HadoopChildProcessRunner.java | 10 +- .../HadoopExternalCommunication.java | 7 +- .../HadoopTcpNioCommunicationClient.java | 12 +- .../java/org/apache/ignite/igfs/IgfsLoad.java | 549 ------ .../ignite/loadtests/igfs/IgfsNodeStartup.java | 48 - .../igfs/IgfsPerformanceBenchmark.java | 281 --- .../processors/query/h2/IgniteH2Indexing.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- .../cache/jta/GridCacheXAResource.java | 9 +- .../processors/schedule/ScheduleFutureImpl.java | 205 +-- .../schedule/GridScheduleSelfTest.java | 4 +- .../ignite/visor/commands/VisorConsole.scala | 61 +- .../visor/commands/ack/VisorAckCommand.scala | 4 +- .../visor/commands/gc/VisorGcCommand.scala | 2 - .../visor/commands/ping/VisorPingCommand.scala | 2 +- .../scala/org/apache/ignite/visor/visor.scala | 34 +- pom.xml | 33 + .../basic-concepts/async-support.md | 92 + .../basic-concepts/getting-started.md | 235 +++ .../basic-concepts/ignite-life-cycel.md | 122 ++ .../documentation/basic-concepts/maven-setup.md | 85 + .../basic-concepts/what-is-ignite.md | 48 + .../basic-concepts/zero-deployment.md | 73 + wiki/documentation/clustering/aws-config.md | 59 + wiki/documentation/clustering/cluster-config.md | 193 ++ wiki/documentation/clustering/cluster-groups.md | 227 +++ wiki/documentation/clustering/cluster.md | 145 ++ .../documentation/clustering/leader-election.md | 76 + wiki/documentation/clustering/network-config.md | 118 ++ wiki/documentation/clustering/node-local-map.md | 52 + .../documentation/compute-grid/checkpointing.md | 255 +++ .../compute-grid/collocate-compute-and-data.md | 46 + wiki/documentation/compute-grid/compute-grid.md | 73 + .../documentation/compute-grid/compute-tasks.md | 122 ++ .../compute-grid/distributed-closures.md | 124 ++ .../compute-grid/executor-service.md | 40 + .../compute-grid/fault-tolerance.md | 96 + .../compute-grid/job-scheduling.md | 86 + .../compute-grid/load-balancing.md | 76 + .../data-grid/affinity-collocation.md | 95 + .../data-grid/automatic-db-integration.md | 119 ++ wiki/documentation/data-grid/cache-modes.md | 254 +++ wiki/documentation/data-grid/cache-queries.md | 181 ++ wiki/documentation/data-grid/data-grid.md | 85 + wiki/documentation/data-grid/data-loading.md | 94 + wiki/documentation/data-grid/evictions.md | 103 ++ .../data-grid/hibernate-l2-cache.md | 190 ++ wiki/documentation/data-grid/jcache.md | 116 ++ wiki/documentation/data-grid/off-heap-memory.md | 197 +++ .../documentation/data-grid/persistent-store.md | 128 ++ wiki/documentation/data-grid/rebalancing.md | 122 ++ wiki/documentation/data-grid/transactions.md | 144 ++ .../data-grid/web-session-clustering.md | 253 +++ .../distributed-data-structures/atomic-types.md | 114 ++ .../countdownlatch.md | 41 + .../distributed-data-structures/id-generator.md | 57 + .../queue-and-set.md | 133 ++ .../distributed-events/automatic-batching.md | 33 + wiki/documentation/distributed-events/events.md | 118 ++ .../distributed-file-system/igfs.md | 18 + .../distributed-messaging/messaging.md | 90 + wiki/documentation/http/configuration.md | 67 + wiki/documentation/http/rest-api.md | 1663 ++++++++++++++++++ .../release-notes/release-notes.md | 30 + .../service-grid/cluster-singletons.md | 111 ++ .../service-grid/service-configuration.md | 50 + .../service-grid/service-example.md | 111 ++ wiki/documentation/service-grid/service-grid.md | 79 + wiki/licence-prepender.sh | 51 + 351 files changed, 9001 insertions(+), 12888 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java index b6aa15c,0000000..8c29898 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java @@@ -1,75 -1,0 +1,66 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + - import java.io.*; - +/** + * Data streamer future. + */ +class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Data streamer. */ ++ /** Data loader. */ + @GridToStringExclude + private IgniteDataStreamerImpl dataLdr; + + /** + * Default constructor for {@link Externalizable} support. + */ + public IgniteDataStreamerFuture() { + // No-op. + } + + /** + * @param ctx Context. + * @param dataLdr Data streamer. + */ - IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { - super(ctx); - ++ IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) { + assert dataLdr != null; + + this.dataLdr = dataLdr; + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { - checkValid(); - + if (onCancelled()) { + dataLdr.closeEx(true); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDataStreamerFuture.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java index faba034,0000000..7867c28 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java @@@ -1,1469 -1,0 +1,1470 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.dr.*; +import org.apache.ignite.internal.processors.portable.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.Map.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * Data streamer implementation. + */ +@SuppressWarnings("unchecked") +public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Isolated updater. */ + private static final Updater ISOLATED_UPDATER = new IsolatedUpdater(); + + /** Cache updater. */ + private Updater<K, V> updater = ISOLATED_UPDATER; + + /** */ + private byte[] updaterBytes; + + /** Max remap count before issuing an error. */ + private static final int DFLT_MAX_REMAP_CNT = 32; + + /** Log reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + ++ /** Logger. */ ++ private static IgniteLogger log; ++ + /** Cache name ({@code null} for default cache). */ + private final String cacheName; + + /** Portable enabled flag. */ + private final boolean portableEnabled; + + /** + * If {@code true} then data will be transferred in compact format (only keys and values). + * Otherwise full map entry will be transferred (this is requires by DR internal logic). + */ + private final boolean compact; + + /** Per-node buffer size. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; + + /** */ + private int parallelOps = DFLT_MAX_PARALLEL_OPS; + + /** */ + private long autoFlushFreq; + + /** Mapping. */ + @GridToStringInclude + private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>(); + - /** Logger. */ - private final IgniteLogger log; - + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr; + + /** Context. */ + private final GridKernalContext ctx; + + /** Communication topic for responses. */ + private final Object topic; + + /** */ + private byte[] topicBytes; + - /** {@code True} if data streamer has been cancelled. */ ++ /** {@code True} if data loader has been cancelled. */ + private volatile boolean cancelled; + - /** Active futures of this data streamer. */ ++ /** Active futures of this data loader. */ + @GridToStringInclude + private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>(); + + /** Closure to remove from active futures. */ + @GridToStringExclude + private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + boolean rmv = activeFuts.remove(t); + + assert rmv; + } + }; + + /** Job peer deploy aware. */ + private volatile GridPeerDeployAware jobPda; + + /** Deployment class. */ + private Class<?> depCls; + + /** Future to track loading finish. */ + private final GridFutureAdapter<?> fut; + + /** Public API future to track loading finish. */ + private final IgniteFuture<?> publicFut; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(); + + /** */ + private volatile long lastFlushTime = U.currentTimeMillis(); + + /** */ + private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ; + + /** */ + private boolean skipStore; + + /** */ + private int maxRemapCnt = DFLT_MAX_REMAP_CNT; + + /** Whether a warning at {@link IgniteDataStreamerImpl#allowOverwrite()} printed */ + private static boolean isWarningPrinted; + + /** + * @param ctx Grid kernal context. + * @param cacheName Cache name. + * @param flushQ Flush queue. + * @param compact If {@code true} data is transferred in compact mode (only keys and values). + * Otherwise full map entry will be transferred (this is required by DR internal logic). + */ + public IgniteDataStreamerImpl( + final GridKernalContext ctx, + @Nullable final String cacheName, + DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ, + boolean compact + ) { + assert ctx != null; + + this.ctx = ctx; + this.cacheName = cacheName; + this.flushQ = flushQ; + this.compact = compact; + - log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class); ++ if (log == null) ++ log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class); + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IllegalStateException("Cache doesn't exist: " + cacheName); + + portableEnabled = ctx.portable().portableEnabled(node, cacheName); + + discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID id = discoEvt.eventNode().id(); + + // Remap regular mappings. + final Buffer buf = bufMappings.remove(id); + + if (buf != null) { + // Only async notification is possible since + // discovery thread may be trapped otherwise. + ctx.closure().callLocalSafe( + new Callable<Object>() { + @Override public Object call() throws Exception { + buf.onNodeLeft(); + + return null; + } + }, + true /* system pool */ + ); + } + } + }; + + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + + // Generate unique topic for this loader. + topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId())); + + ctx.io().addMessageListener(topic, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof GridDataLoadResponse; + + GridDataLoadResponse res = (GridDataLoadResponse)msg; + + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + Buffer buf = bufMappings.get(nodeId); + + if (buf != null) + buf.onResponse(res); + + else if (log.isDebugEnabled()) + log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); + } + }); + + if (log.isDebugEnabled()) + log.debug("Added response listener within topic: " + topic); + + fut = new IgniteDataStreamerFuture(ctx, this); + + publicFut = new IgniteFutureImpl<>(fut); + } + + /** + * Enters busy lock. + */ + private void enterBusy() { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Data streamer has been closed."); + } + + /** + * Leaves busy lock. + */ + private void leaveBusy() { + busyLock.leaveBusy(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> future() { + return publicFut; + } + + /** + * @return Internal future. + */ + public IgniteInternalFuture<?> internalFuture() { + return fut; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class<?> depCls) { + this.depCls = depCls; + } + + /** {@inheritDoc} */ + @Override public void updater(Updater<K, V> updater) { + A.notNull(updater, "updater"); + + this.updater = updater; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + boolean allow = updater != ISOLATED_UPDATER; + + if (!allow && !isWarningPrinted) { + synchronized (this) { + if (!isWarningPrinted) { + log.warning("Data streamer will not overwrite existing cache entries for better performance " + + "(to change, set allowOverwrite to true)"); + + isWarningPrinted = true; + } + } + } + + return allow; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allow) { + if (allow == allowOverwrite()) + return; + + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); + + if (node == null) + throw new IgniteException("Failed to get node for cache: " + cacheName); + + updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ + @Override @Nullable public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return bufSize; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + A.ensure(bufSize > 0, "bufSize > 0"); + + this.bufSize = bufSize; + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelStreamOperations() { + return parallelOps; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelStreamOperations(int parallelOps) { + this.parallelOps = parallelOps; + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0"); + + long old = this.autoFlushFreq; + + if (autoFlushFreq != old) { + this.autoFlushFreq = autoFlushFreq; + + if (autoFlushFreq != 0 && old == 0) + flushQ.add(this); + else if (autoFlushFreq == 0) + flushQ.remove(this); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + A.notNull(entries, "entries"); + + return addData(entries.entrySet()); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { + A.notEmpty(entries, "entries"); + + enterBusy(); + + try { - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx); ++ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); + - resFut.listenAsync(rmvActiveFut); ++ resFut.listen(rmvActiveFut); + + activeFuts.add(resFut); + + Collection<K> keys = null; + + if (entries.size() > 1) { + keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); + + for (Map.Entry<K, V> entry : entries) + keys.add(entry.getKey()); + } + + load0(entries, resFut, keys, 0); + + return new IgniteFutureImpl<>(resFut); + } + catch (IgniteException e) { - return new IgniteFinishedFutureImpl<>(ctx, e); ++ return new IgniteFinishedFutureImpl<>(e); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) { + A.notNull(entry, "entry"); + + return addData(F.asList(entry)); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(K key, V val) { + A.notNull(key, "key"); + + return addData(new Entry0<>(key, val)); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> removeData(K key) { + return addData(key, null); + } + + /** + * @param entries Entries. + * @param resFut Result future. + * @param activeKeys Active keys. + * @param remaps Remaps count. + */ + private void load0( + Collection<? extends Map.Entry<K, V>> entries, + final GridFutureAdapter<Object> resFut, + @Nullable final Collection<K> activeKeys, + final int remaps + ) { + assert entries != null; + + Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>(); + + boolean initPda = ctx.deploy().enabled() && jobPda == null; + + for (Map.Entry<K, V> entry : entries) { + List<ClusterNode> nodes; + + try { + K key = entry.getKey(); + + assert key != null; + + if (initPda) { + jobPda = new DataStreamerPda(key, entry.getValue(), updater); + + initPda = false; + } + + nodes = nodes(key); + } + catch (IgniteCheckedException e) { + resFut.onDone(e); + + return; + } + + if (F.isEmpty(nodes)) { + resFut.onDone(new ClusterTopologyException("Failed to map key to node " + + "(no nodes with cache found in topology) [infos=" + entries.size() + + ", cacheName=" + cacheName + ']')); + + return; + } + + for (ClusterNode node : nodes) { + Collection<Map.Entry<K, V>> col = mappings.get(node); + + if (col == null) + mappings.put(node, col = new ArrayList<>()); + + col.add(entry); + } + } + + for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) { + final UUID nodeId = e.getKey().id(); + + Buffer buf = bufMappings.get(nodeId); + + if (buf == null) { + Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); + + if (old != null) + buf = old; + } + + final Collection<Map.Entry<K, V>> entriesForNode = e.getValue(); + + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + try { + t.get(); + + if (activeKeys != null) { + for (Map.Entry<K, V> e : entriesForNode) + activeKeys.remove(e.getKey()); + + if (activeKeys.isEmpty()) + resFut.onDone(); + } + else { + assert entriesForNode.size() == 1; + + // That has been a single key, + // so complete result future right away. + resFut.onDone(); + } + } + catch (IgniteCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + if (cancelled) { + resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + + IgniteDataStreamerImpl.this, e1)); + } + else if (remaps + 1 > maxRemapCnt) { + resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + + remaps), e1); + } + else + load0(entriesForNode, resFut, activeKeys, remaps + 1); + } + } + }; + + GridFutureAdapter<?> f; + + try { + f = buf.update(entriesForNode, lsnr); + } + catch (IgniteInterruptedCheckedException e1) { + resFut.onDone(e1); + + return; + } + + if (ctx.discovery().node(nodeId) == null) { + if (bufMappings.remove(nodeId, buf)) + buf.onNodeLeft(); + + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + } + } + + /** + * @param key Key to map. + * @return Nodes to send requests to. + * @throws IgniteCheckedException If failed. + */ + private List<ClusterNode> nodes(K key) throws IgniteCheckedException { + GridAffinityProcessor aff = ctx.affinity(); + + return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) : + Collections.singletonList(aff.mapKeyToNode(cacheName, key)); + } + + /** + * Performs flush. + * + * @throws IgniteCheckedException If failed. + */ + private void doFlush() throws IgniteCheckedException { + lastFlushTime = U.currentTimeMillis(); + + List<IgniteInternalFuture> activeFuts0 = null; + + int doneCnt = 0; + + for (IgniteInternalFuture<?> f : activeFuts) { + if (!f.isDone()) { + if (activeFuts0 == null) + activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2)); + + activeFuts0.add(f); + } + else { + f.get(); + + doneCnt++; + } + } + + if (activeFuts0 == null || activeFuts0.isEmpty()) + return; + + while (true) { + Queue<IgniteInternalFuture<?>> q = null; + + for (Buffer buf : bufMappings.values()) { + IgniteInternalFuture<?> flushFut = buf.flush(); + + if (flushFut != null) { + if (q == null) + q = new ArrayDeque<>(bufMappings.size() * 2); + + q.add(flushFut); + } + } + + if (q != null) { + assert !q.isEmpty(); + + boolean err = false; + + for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); + + err = true; + } + } + + if (err) + // Remaps needed - flush buffers. + continue; + } + + doneCnt = 0; + + for (int i = 0; i < activeFuts0.size(); i++) { + IgniteInternalFuture f = activeFuts0.get(i); + + if (f == null) + doneCnt++; + else if (f.isDone()) { + f.get(); + + doneCnt++; + + activeFuts0.set(i, null); + } + else + break; + } + + if (doneCnt == activeFuts0.size()) + return; + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void flush() throws IgniteException { + enterBusy(); + + try { + doFlush(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * Flushes every internal buffer if buffer was flushed before passed in + * threshold. + * <p> + * Does not wait for result and does not fail on errors assuming that this method + * should be called periodically. + */ + @Override public void tryFlush() throws IgniteInterruptedException { + if (!busyLock.enterBusy()) + return; + + try { + for (Buffer buf : bufMappings.values()) + buf.flush(); + + lastFlushTime = U.currentTimeMillis(); + } + catch (IgniteInterruptedCheckedException e) { + throw U.convertException(e); + } + finally { + leaveBusy(); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteException If failed. + */ + @Override public void close(boolean cancel) throws IgniteException { + try { + closeEx(cancel); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * @param cancel {@code True} to close with cancellation. + * @throws IgniteCheckedException If failed. + */ + public void closeEx(boolean cancel) throws IgniteCheckedException { + if (!closed.compareAndSet(false, true)) + return; + + busyLock.block(); + + if (log.isDebugEnabled()) + log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); + + IgniteCheckedException e = null; + + try { + // Assuming that no methods are called on this loader after this method is called. + if (cancel) { + cancelled = true; + + for (Buffer buf : bufMappings.values()) + buf.cancelAll(); + } + else + doFlush(); + + ctx.event().removeLocalEventListener(discoLsnr); + + ctx.io().removeMessageListener(topic); + } + catch (IgniteCheckedException e0) { + e = e0; + } + + fut.onDone(null, e); + + if (e != null) + throw e; + } + + /** + * @return {@code true} If the loader is closed. + */ + boolean isClosed() { + return fut.isDone(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + close(false); + } + + /** + * @return Max remap count. + */ + public int maxRemapCount() { + return maxRemapCnt; + } + + /** + * @param maxRemapCnt New max remap count. + */ + public void maxRemapCount(int maxRemapCnt) { + this.maxRemapCnt = maxRemapCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDataStreamerImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public long getDelay(TimeUnit unit) { + return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + /** + * @return Next flush time. + */ + private long nextFlushTime() { + return lastFlushTime + autoFlushFreq; + } + + /** {@inheritDoc} */ + @Override public int compareTo(Delayed o) { + return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1; + } + + /** + * + */ + private class Buffer { + /** Node. */ + private final ClusterNode node; + + /** Active futures. */ + private final Collection<IgniteInternalFuture<Object>> locFuts; + + /** Buffered entries. */ + private List<Map.Entry<K, V>> entries; + + /** */ + @GridToStringExclude + private GridFutureAdapter<Object> curFut; + + /** Local node flag. */ + private final boolean isLocNode; + + /** ID generator. */ + private final AtomicLong idGen = new AtomicLong(); + + /** Active futures. */ + private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs; + + /** */ + private final Semaphore sem; + + /** Closure to signal on task finish. */ + @GridToStringExclude + private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + signalTaskFinished(t); + } + }; + + /** + * @param node Node. + */ + Buffer(ClusterNode node) { + assert node != null; + + this.node = node; + + locFuts = new GridConcurrentHashSet<>(); + reqs = new ConcurrentHashMap8<>(); + + // Cache local node flag. + isLocNode = node.equals(ctx.discovery().localNode()); + + entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); ++ curFut = new GridFutureAdapter<>(); ++ curFut.listen(signalC); + + sem = new Semaphore(parallelOps); + } + + /** + * @param newEntries Infos. + * @param lsnr Listener for the operation future. + * @throws IgniteInterruptedCheckedException If failed. + * @return Future for operation. + */ + @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries, + IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException { + List<Map.Entry<K, V>> entries0 = null; + GridFutureAdapter<Object> curFut0; + + synchronized (this) { + curFut0 = curFut; + - curFut0.listenAsync(lsnr); ++ curFut0.listen(lsnr); + + for (Map.Entry<K, V> entry : newEntries) + entries.add(entry); + + if (entries.size() >= bufSize) { + entries0 = entries; + + entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); ++ curFut = new GridFutureAdapter<>(); ++ curFut.listen(signalC); + } + } + + if (entries0 != null) { + submit(entries0, curFut0); + + if (cancelled) + curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this)); + } + + return curFut0; + } + + /** + * @return Fresh collection with some space for outgrowth. + */ + private List<Map.Entry<K, V>> newEntries() { + return new ArrayList<>((int)(bufSize * 1.2)); + } + + /** + * @return Future if any submitted. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException { + List<Map.Entry<K, V>> entries0 = null; + GridFutureAdapter<Object> curFut0 = null; + + synchronized (this) { + if (!entries.isEmpty()) { + entries0 = entries; + curFut0 = curFut; + + entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); ++ curFut = new GridFutureAdapter<>(); ++ curFut.listen(signalC); + } + } + + if (entries0 != null) + submit(entries0, curFut0); + + // Create compound future for this flush. + GridCompoundFuture<Object, Object> res = null; + + for (IgniteInternalFuture<Object> f : locFuts) { + if (res == null) - res = new GridCompoundFuture<>(ctx); ++ res = new GridCompoundFuture<>(); + + res.add(f); + } + + for (IgniteInternalFuture<Object> f : reqs.values()) { + if (res == null) - res = new GridCompoundFuture<>(ctx); ++ res = new GridCompoundFuture<>(); + + res.add(f); + } + + if (res != null) + res.markInitialized(); + + return res; + } + + /** + * Increments active tasks count. + * + * @throws IgniteInterruptedCheckedException If thread has been interrupted. + */ + private void incrementActiveTasks() throws IgniteInterruptedCheckedException { + U.acquire(sem); + } + + /** + * @param f Future that finished. + */ + private void signalTaskFinished(IgniteInternalFuture<Object> f) { + assert f != null; + + sem.release(); + } + + /** + * @param entries Entries to submit. + * @param curFut Current future. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut) + throws IgniteInterruptedCheckedException { + assert entries != null; + assert !entries.isEmpty(); + assert curFut != null; + + incrementActiveTasks(); + + IgniteInternalFuture<Object> fut; + + if (isLocNode) { + fut = ctx.closure().callLocalSafe( + new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); + + locFuts.add(fut); + - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() { ++ fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + try { + boolean rmv = locFuts.remove(t); + + assert rmv; + + curFut.onDone(t.get()); + } + catch (IgniteCheckedException e) { + curFut.onDone(e); + } + } + }); + } + else { + byte[] entriesBytes; + + try { + if (compact) { + entriesBytes = ctx.config().getMarshaller() + .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null)); + } + else + entriesBytes = ctx.config().getMarshaller().marshal(entries); + + if (updaterBytes == null) { + assert updater != null; + + updaterBytes = ctx.config().getMarshaller().marshal(updater); + } + + if (topicBytes == null) + topicBytes = ctx.config().getMarshaller().marshal(topic); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal (request will not be sent).", e); + + return; + } + + GridDeployment dep = null; + GridPeerDeployAware jobPda0 = null; + + if (ctx.deploy().enabled()) { + try { + jobPda0 = jobPda; + + assert jobPda0 != null; + + dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader()); + + GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); + + if (cache != null) + cache.context().deploy().onEnter(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); + + return; + } + + if (dep == null) + U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass()); + } + + long reqId = idGen.incrementAndGet(); + + fut = curFut; + + reqs.put(reqId, (GridFutureAdapter<Object>)fut); + + GridDataLoadRequest req = new GridDataLoadRequest( + reqId, + topicBytes, + cacheName, + updaterBytes, + entriesBytes, + true, + skipStore, + dep != null ? dep.deployMode() : null, + dep != null ? jobPda0.deployClass().getName() : null, + dep != null ? dep.userVersion() : null, + dep != null ? dep.participants() : null, + dep != null ? dep.classLoaderId() : null, + dep == null); + + try { + ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL); + + if (log.isDebugEnabled()) + log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + ((GridFutureAdapter<Object>)fut).onDone(e); + else + ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " + + "request (node has left): " + node.id())); + } + } + } + + /** + * + */ + void onNodeLeft() { + assert !isLocNode; + assert bufMappings.get(node.id()) != this; + + if (log.isDebugEnabled()) + log.debug("Forcibly completing futures (node has left): " + node.id()); + + Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + node.id()); + + for (GridFutureAdapter<Object> f : reqs.values()) + f.onDone(e); + + // Make sure to complete current future. + GridFutureAdapter<Object> curFut0; + + synchronized (this) { + curFut0 = curFut; + } + + curFut0.onDone(e); + } + + /** + * @param res Response. + */ + void onResponse(GridDataLoadResponse res) { + if (log.isDebugEnabled()) + log.debug("Received data load response: " + res); + + GridFutureAdapter<?> f = reqs.remove(res.requestId()); + + if (f == null) { + if (log.isDebugEnabled()) + log.debug("Future for request has not been found: " + res.requestId()); + + return; + } + + Throwable err = null; + + byte[] errBytes = res.errorBytes(); + + if (errBytes != null) { + try { + GridPeerDeployAware jobPda0 = jobPda; + + err = ctx.config().getMarshaller().unmarshal( + errBytes, + jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader()); + } + catch (IgniteCheckedException e) { + f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); + + return; + } + } + + f.onDone(null, err); + + if (log.isDebugEnabled()) + log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']'); + } + + /** + * + */ + void cancelAll() { + IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this); + + for (IgniteInternalFuture<?> f : locFuts) { + try { + f.cancel(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel mini-future.", e); + } + } + + for (GridFutureAdapter<?> f : reqs.values()) + f.onDone(err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + int size; + + synchronized (this) { + size = entries.size(); + } + + return S.toString(Buffer.class, this, + "entriesCnt", size, + "locFutsSize", locFuts.size(), + "reqsSize", reqs.size()); + } + } + + /** + * Data streamer peer-deploy aware. + */ + private class DataStreamerPda implements GridPeerDeployAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Deploy class. */ + private Class<?> cls; + + /** Class loader. */ + private ClassLoader ldr; + + /** Collection of objects to detect deploy class and class loader. */ + private Collection<Object> objs; + + /** + * Constructs data streamer peer-deploy aware. + * + * @param objs Collection of objects to detect deploy class and class loader. + */ + private DataStreamerPda(Object... objs) { + this.objs = Arrays.asList(objs); + } + + /** {@inheritDoc} */ + @Override public Class<?> deployClass() { + if (cls == null) { + Class<?> cls0 = null; + + if (depCls != null) + cls0 = depCls; + else { + for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) { + Object o = it.next(); + + if (o != null) + cls0 = U.detectClass(o); + } + + if (cls0 == null || U.isJdk(cls0)) + cls0 = IgniteDataStreamerImpl.class; + } + + assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; + + cls = cls0; + } + + return cls; + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + if (ldr == null) { + ClassLoader ldr0 = deployClass().getClassLoader(); + + // Safety. + if (ldr0 == null) + ldr0 = U.gridClassLoader(); + + assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; + + ldr = ldr0; + } + + return ldr; + } + } + + /** + * Entry. + */ + private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private K key; + + /** */ + private V val; + + /** + * @param key Key. + * @param val Value. + */ + private Entry0(K key, @Nullable V val) { + assert key != null; + + this.key = key; + this.val = val; + } + + /** + * For {@link Externalizable}. + */ + @SuppressWarnings("UnusedDeclaration") + public Entry0() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return val; + } + + /** {@inheritDoc} */ + @Override public V setValue(V val) { + V old = this.val; + + this.val = val; + + return old; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(key); + out.writeObject(val); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + key = (K)in.readObject(); + val = (V)in.readObject(); + } + } + + /** + * Wrapper list with special compact serialization of map entries. + */ + private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Wrapped delegate. */ + private Collection<Map.Entry<K, V>> delegate; + + /** Optional portable processor for converting values. */ + private GridPortableProcessor portable; + + /** + * @param delegate Delegate. + * @param portable Portable processor. + */ + private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) { + this.delegate = delegate; + this.portable = portable; + } + + /** + * For {@link Externalizable}. + */ + public Entries0() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Iterator<Entry<K, V>> iterator() { + return delegate.iterator(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return delegate.size(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(delegate.size()); + + boolean portableEnabled = portable != null; + + for (Map.Entry<K, V> entry : delegate) { + if (portableEnabled) { + out.writeObject(portable.marshalToPortable(entry.getKey())); + out.writeObject(portable.marshalToPortable(entry.getValue())); + } + else { + out.writeObject(entry.getKey()); + out.writeObject(entry.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + int sz = in.readInt(); + + delegate = new ArrayList<>(sz); + + for (int i = 0; i < sz; i++) { + Object k = in.readObject(); + Object v = in.readObject(); + + delegate.add(new Entry0<>((K)k, (V)v)); + } + } + } + + /** + * Isolated updater which only loads entry initial value. + */ + private static class IsolatedUpdater<K, V> implements Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache; + + GridCacheAdapter<K, V> internalCache = proxy.context().cache(); + + if (internalCache.isNear()) + internalCache = internalCache.context().near().dht(); + + GridCacheContext<K, V> cctx = internalCache.context(); + + long topVer = cctx.affinity().affinityTopologyVersion(); + + GridCacheVersion ver = cctx.versions().next(topVer); + + boolean portable = cctx.portableEnabled(); + + for (Map.Entry<K, V> e : entries) { + try { + K key = e.getKey(); + V val = e.getValue(); + + if (portable) { + key = (K)cctx.marshalToPortable(key); + val = (V)cctx.marshalToPortable(val); + } + + GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer); + + entry.unswap(true, false); + + entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer, + GridDrType.DR_LOAD); + + cctx.evicts().touch(entry, topVer); + } + catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + IgniteLogger log = cache.unwrap(Ignite.class).log(); + + U.error(log, "Failed to set initial value for cache entry: " + e, ex); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java index 9dd4a8e,0000000..2934153 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java @@@ -1,316 -1,0 +1,316 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastream; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { + /** Loaders map (access is not supposed to be highly concurrent). */ + private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Flushing thread. */ + private Thread flusher; + + /** */ + private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** + * @param ctx Kernal context. + */ + public IgniteDataStreamerProcessor(GridKernalContext ctx) { + super(ctx); + + ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof GridDataLoadRequest; + + processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); + } + }); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + IgniteDataStreamerImpl<K, V> ldr = flushQ.take(); + + if (!busyLock.enterBusy()) + return; + + try { + if (ldr.isClosed()) + continue; + + ldr.tryFlush(); + + flushQ.offer(ldr); + } + finally { + busyLock.leaveBusy(); + } + } + } + }); + + flusher.start(); + + if (log.isDebugEnabled()) + log.debug("Started data streamer processor."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; + + ctx.io().removeMessageListener(TOPIC_DATALOAD); + + busyLock.block(); + + U.interrupt(flusher); + U.join(flusher, log); + + for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) { + if (log.isDebugEnabled()) + log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); + + try { + ldr.closeEx(cancel); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close data streamer: " + ldr, e); + } + } + + if (log.isDebugEnabled()) + log.debug("Stopped data streamer processor."); + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @param compact {@code true} if data streamer should transfer data in compact format. + * @return Data streamer. + */ + public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); + + try { + final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact); + + ldrs.add(ldr); + - ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() { ++ ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + boolean b = ldrs.remove(ldr); + + assert b : "Loader has not been added to set: " + ldr; + + if (log.isDebugEnabled()) + log.debug("Loader has been completed: " + ldr); + } + }); + + return ldr; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @return Data streamer. + */ + public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) { + return dataStreamer(cacheName, true); + } + + /** + * @param nodeId Sender ID. + * @param req Request. + */ + private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { + if (!busyLock.enterBusy()) { + if (log.isDebugEnabled()) + log.debug("Ignoring data load request (node is stopping): " + req); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Processing data load request: " + req); + + Object topic; + + try { + topic = marsh.unmarshal(req.responseTopicBytes(), null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal topic from request: " + req, e); + + return; + } + + ClassLoader clsLdr; + + if (req.forceLocalDeployment()) + clsLdr = U.gridClassLoader(); + else { + GridDeployment dep = ctx.deploy().getGlobalDeployment( + req.deploymentMode(), + req.sampleClassName(), + req.sampleClassName(), + req.userVersion(), + nodeId, + req.classLoaderId(), + req.participants(), + null); + + if (dep == null) { + sendResponse(nodeId, + topic, + req.requestId(), + new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + + ", req=" + req + ']'), + false); + + return; + } + + clsLdr = dep.classLoader(); + } + + Collection<Map.Entry<K, V>> col; + IgniteDataStreamer.Updater<K, V> updater; + + try { + col = marsh.unmarshal(req.collectionBytes(), clsLdr); + updater = marsh.unmarshal(req.updaterBytes(), clsLdr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); + + sendResponse(nodeId, topic, req.requestId(), e, false); + + return; + } + + IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx, + log, + req.cacheName(), + col, + req.ignoreDeploymentOwnership(), + req.skipStore(), + updater); + + Exception err = null; + + try { + job.call(); + } + catch (Exception e) { + U.error(log, "Failed to finish update job.", e); + + err = e; + } + + sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Node ID. + * @param resTopic Response topic. + * @param reqId Request ID. + * @param err Error. + * @param forceLocDep Force local deployment. + */ + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, + boolean forceLocDep) { + byte[] errBytes; + + try { + errBytes = err != null ? marsh.marshal(err) : null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message.", e); + + return; + } + + GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep); + + try { + ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(nodeId)) + U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); + else if (log.isDebugEnabled()) + log.debug("Node has left the grid: " + nodeId); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ldrsSize: " + ldrs.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java ----------------------------------------------------------------------