Merge branch 'sprint-2' into ignite-394
Conflicts:
modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de65eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de65eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de65eb0
Branch: refs/heads/ignite-394
Commit: 1de65eb06834f2df87b373bdd9e55371bb5bcbac
Parents: 1f2b802 40b96e1
Author: Artem Shutak <[email protected]>
Authored: Tue Mar 10 13:05:10 2015 +0300
Committer: Artem Shutak <[email protected]>
Committed: Tue Mar 10 13:05:10 2015 +0300
----------------------------------------------------------------------
DISCLAIMER.txt | 15 +
README.txt | 25 +
RELEASE_NOTES.txt | 18 +
assembly/release-base.xml | 20 +-
assembly/release-fabric.xml | 5 -
bin/ignitevisorcmd.bat | 7 +-
bin/ignitevisorcmd.sh | 7 +-
bin/include/target-classpath.bat | 2 +-
bin/include/target-classpath.sh | 2 +-
docs/ignite_readme.md | 100 --
docs/ignite_readme.pdf | Bin 77136 -> 0 bytes
docs/release_notes.md | 16 -
docs/release_notes.pdf | Bin 33174 -> 0 bytes
docs/wiki/basic-concepts/async-support.md | 75 -
docs/wiki/basic-concepts/getting-started.md | 218 ---
docs/wiki/basic-concepts/ignite-life-cycel.md | 105 --
docs/wiki/basic-concepts/maven-setup.md | 68 -
docs/wiki/basic-concepts/what-is-ignite.md | 31 -
docs/wiki/basic-concepts/zero-deployment.md | 56 -
docs/wiki/clustering/aws-config.md | 42 -
docs/wiki/clustering/cluster-config.md | 176 --
docs/wiki/clustering/cluster-groups.md | 210 ---
docs/wiki/clustering/cluster.md | 128 --
docs/wiki/clustering/leader-election.md | 59 -
docs/wiki/clustering/network-config.md | 101 --
docs/wiki/clustering/node-local-map.md | 35 -
docs/wiki/compute-grid/checkpointing.md | 238 ---
.../compute-grid/collocate-compute-and-data.md | 29 -
docs/wiki/compute-grid/compute-grid.md | 56 -
docs/wiki/compute-grid/compute-tasks.md | 105 --
docs/wiki/compute-grid/distributed-closures.md | 107 --
docs/wiki/compute-grid/executor-service.md | 23 -
docs/wiki/compute-grid/fault-tolerance.md | 79 -
docs/wiki/compute-grid/job-scheduling.md | 69 -
docs/wiki/compute-grid/load-balancing.md | 59 -
docs/wiki/data-grid/affinity-collocation.md | 78 -
docs/wiki/data-grid/automatic-db-integration.md | 102 --
docs/wiki/data-grid/cache-modes.md | 237 ---
docs/wiki/data-grid/cache-queries.md | 164 --
docs/wiki/data-grid/data-grid.md | 68 -
docs/wiki/data-grid/data-loading.md | 77 -
docs/wiki/data-grid/evictions.md | 86 -
docs/wiki/data-grid/hibernate-l2-cache.md | 173 --
docs/wiki/data-grid/jcache.md | 99 --
docs/wiki/data-grid/off-heap-memory.md | 180 --
docs/wiki/data-grid/persistent-store.md | 111 --
docs/wiki/data-grid/rebalancing.md | 105 --
docs/wiki/data-grid/transactions.md | 127 --
docs/wiki/data-grid/web-session-clustering.md | 236 ---
.../distributed-data-structures/atomic-types.md | 97 -
.../countdownlatch.md | 24 -
.../distributed-data-structures/id-generator.md | 40 -
.../queue-and-set.md | 116 --
.../distributed-events/automatic-batching.md | 16 -
docs/wiki/distributed-events/events.md | 101 --
docs/wiki/distributed-file-system/igfs.md | 1 -
docs/wiki/distributed-messaging/messaging.md | 73 -
docs/wiki/http/configuration.md | 58 -
docs/wiki/http/rest-api.md | 1646 -----------------
docs/wiki/release-notes/release-notes.md | 13 -
docs/wiki/service-grid/cluster-singletons.md | 94 -
docs/wiki/service-grid/service-configuration.md | 33 -
docs/wiki/service-grid/service-example.md | 94 -
docs/wiki/service-grid/service-grid.md | 62 -
.../ComputeFibonacciContinuationExample.java | 12 +-
.../examples/datagrid/CacheApiExample.java | 2 +-
.../examples/ScalarContinuationExample.scala | 12 +-
.../client/suite/IgniteClientTestSuite.java | 3 +-
.../org/apache/ignite/IgniteFileSystem.java | 24 +-
.../apache/ignite/IgniteSystemProperties.java | 35 +-
.../org/apache/ignite/cache/CacheManager.java | 1 -
.../igfs/IgfsDirectoryNotEmptyException.java | 42 +
.../ignite/igfs/IgfsFileNotFoundException.java | 44 -
.../ignite/igfs/IgfsPathNotFoundException.java | 44 +
.../igfs/secondary/IgfsSecondaryFileSystem.java | 10 +-
.../internal/ComputeTaskInternalFuture.java | 44 +-
.../ignite/internal/GridJobContextImpl.java | 6 +-
.../ignite/internal/GridJobSessionImpl.java | 2 +-
.../ignite/internal/GridKernalGatewayImpl.java | 26 -
.../ignite/internal/GridTaskSessionImpl.java | 2 +-
.../ignite/internal/IgniteInternalFuture.java | 79 +-
.../apache/ignite/internal/IgniteKernal.java | 24 +-
.../internal/client/GridClientFuture.java | 9 +-
.../client/impl/GridClientDataImpl.java | 2 +-
.../client/impl/GridClientFutureAdapter.java | 26 +-
.../connection/GridClientNioTcpConnection.java | 5 +-
.../impl/GridTcpRouterNioListenerAdapter.java | 2 +-
.../internal/cluster/IgniteClusterImpl.java | 9 +-
.../internal/executor/GridExecutorService.java | 2 +-
.../igfs/common/IgfsControlResponse.java | 5 +-
.../managers/communication/GridIoManager.java | 43 +-
.../discovery/GridDiscoveryManager.java | 10 +-
.../eventstorage/GridEventStorageManager.java | 6 +-
.../affinity/GridAffinityAssignmentCache.java | 17 +-
.../affinity/GridAffinityProcessor.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 91 +-
.../cache/GridCacheAffinityManager.java | 2 +-
.../cache/GridCacheDeploymentManager.java | 2 +-
.../cache/GridCacheEvictionManager.java | 27 +-
.../processors/cache/GridCacheGateway.java | 6 +
.../processors/cache/GridCacheIoManager.java | 77 +-
.../processors/cache/GridCacheMapEntry.java | 14 +-
.../processors/cache/GridCacheMessage.java | 7 -
.../cache/GridCacheMultiTxFuture.java | 54 +-
.../processors/cache/GridCacheMvcc.java | 3 +-
.../processors/cache/GridCacheMvccManager.java | 34 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 2 +-
.../cache/GridCacheProjectionImpl.java | 12 +-
.../cache/GridCacheSharedContext.java | 2 +-
.../processors/cache/GridCacheUtils.java | 11 +-
.../processors/cache/IgniteCacheProxy.java | 2 +-
...ridCacheOptimisticCheckPreparedTxFuture.java | 25 +-
.../distributed/GridCacheTxFinishSync.java | 2 +-
.../GridDistributedCacheAdapter.java | 4 +-
.../GridDistributedTxRemoteAdapter.java | 6 +-
.../dht/GridDhtAssignmentFetchFuture.java | 22 +-
.../distributed/dht/GridDhtCacheAdapter.java | 9 +-
.../distributed/dht/GridDhtCacheEntry.java | 14 +-
.../distributed/dht/GridDhtEmbeddedFuture.java | 43 +-
.../distributed/dht/GridDhtFinishedFuture.java | 22 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 45 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 35 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 35 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 38 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 16 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 48 +-
.../dht/GridPartitionedGetFuture.java | 44 +-
.../dht/atomic/GridDhtAtomicCache.java | 20 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 20 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 43 +-
.../dht/colocated/GridDhtColocatedCache.java | 30 +-
.../colocated/GridDhtColocatedLockFuture.java | 43 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 43 +-
.../preloader/GridDhtPartitionDemandPool.java | 9 +-
.../GridDhtPartitionsExchangeFuture.java | 48 +-
.../dht/preloader/GridDhtPreloader.java | 22 +-
.../distributed/near/GridNearAtomicCache.java | 2 +-
.../distributed/near/GridNearCacheAdapter.java | 6 +-
.../distributed/near/GridNearGetFuture.java | 51 +-
.../distributed/near/GridNearLockFuture.java | 43 +-
.../near/GridNearTransactionalCache.java | 2 +-
.../near/GridNearTxFinishFuture.java | 32 +-
.../cache/distributed/near/GridNearTxLocal.java | 83 +-
.../near/GridNearTxPrepareFuture.java | 48 +-
.../processors/cache/local/GridLocalCache.java | 2 +-
.../cache/local/GridLocalLockFuture.java | 23 +-
.../processors/cache/local/GridLocalTx.java | 10 +-
.../cache/local/GridLocalTxFuture.java | 59 +-
.../local/atomic/GridLocalAtomicCache.java | 8 +-
.../GridCacheDistributedFieldsQueryFuture.java | 13 +-
.../query/GridCacheDistributedQueryFuture.java | 11 -
.../query/GridCacheDistributedQueryManager.java | 4 +-
.../query/GridCacheFieldsQueryErrorFuture.java | 53 -
.../query/GridCacheLocalFieldsQueryFuture.java | 13 +-
.../cache/query/GridCacheLocalQueryFuture.java | 15 +-
.../cache/query/GridCacheQueryErrorFuture.java | 5 +-
.../query/GridCacheQueryFutureAdapter.java | 14 +-
.../cache/query/GridCacheQueryManager.java | 11 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 25 +-
.../transactions/IgniteTxLocalAdapter.java | 320 ++--
.../cache/transactions/IgniteTxManager.java | 18 +-
.../transactions/TransactionProxyImpl.java | 2 +-
.../closure/GridClosureProcessor.java | 38 +-
.../continuous/GridContinuousProcessor.java | 36 +-
.../datastream/IgniteDataStreamerFuture.java | 13 +-
.../datastream/IgniteDataStreamerImpl.java | 39 +-
.../datastream/IgniteDataStreamerProcessor.java | 2 +-
.../GridCacheAtomicSequenceImpl.java | 4 +-
.../processors/hadoop/HadoopNoopProcessor.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 36 +-
.../processors/igfs/IgfsDeleteWorker.java | 2 +-
.../igfs/IgfsDirectoryNotEmptyException.java | 44 -
.../internal/processors/igfs/IgfsImpl.java | 30 +-
.../processors/igfs/IgfsInputStreamImpl.java | 10 +-
.../processors/igfs/IgfsIpcHandler.java | 4 +-
.../processors/igfs/IgfsMetaManager.java | 22 +-
.../processors/igfs/IgfsOutputStreamImpl.java | 2 +-
.../internal/processors/igfs/IgfsServer.java | 3 +-
.../processors/job/GridJobProcessor.java | 4 +-
.../processors/query/GridQueryProcessor.java | 6 +-
.../processors/resource/GridResourceUtils.java | 4 +-
.../processors/rest/GridRestProcessor.java | 14 +-
.../handlers/cache/GridCacheCommandHandler.java | 6 +-
.../cache/GridCacheQueryCommandHandler.java | 4 +-
.../DataStructuresCommandHandler.java | 4 +-
.../handlers/task/GridTaskCommandHandler.java | 6 +-
.../top/GridTopologyCommandHandler.java | 4 +-
.../version/GridVersionCommandHandler.java | 2 +-
.../tcp/GridTcpMemcachedNioListener.java | 6 +-
.../protocols/tcp/GridTcpRestNioListener.java | 6 +-
.../service/GridServiceDeploymentFuture.java | 9 +-
.../service/GridServiceProcessor.java | 10 +-
.../GridStreamerStageExecutionFuture.java | 32 +-
.../processors/streamer/IgniteStreamerImpl.java | 23 +-
.../internal/util/GridSerializableFuture.java | 28 -
.../ignite/internal/util/GridThreadLocal.java | 175 --
.../ignite/internal/util/GridThreadLocalEx.java | 210 ---
.../ignite/internal/util/IgniteUtils.java | 36 +-
.../util/future/GridCompoundFuture.java | 52 +-
.../util/future/GridCompoundIdentityFuture.java | 18 +-
.../util/future/GridEmbeddedFuture.java | 77 +-
.../util/future/GridFinishedFuture.java | 158 +-
.../util/future/GridFinishedFutureEx.java | 197 ---
.../internal/util/future/GridFutureAdapter.java | 365 +---
.../util/future/GridFutureAdapterEx.java | 517 ------
.../util/future/GridFutureChainListener.java | 18 +-
.../util/future/IgniteFinishedFutureImpl.java | 27 +-
.../util/future/IgniteFinishedFutureImplEx.java | 30 -
.../internal/util/future/IgniteFutureImpl.java | 31 +-
.../internal/util/io/GridFilenameUtils.java | 2 +-
.../ignite/internal/util/lang/GridFunc.java | 90 +-
.../internal/util/lang/GridPlainFuture.java | 79 -
.../util/lang/GridPlainFutureAdapter.java | 299 ----
.../util/nio/GridNioEmbeddedFuture.java | 12 +-
.../util/nio/GridNioFinishedFuture.java | 77 +-
.../ignite/internal/util/nio/GridNioFuture.java | 84 +-
.../internal/util/nio/GridNioFutureImpl.java | 282 +--
.../ignite/internal/util/nio/GridNioServer.java | 7 +-
.../util/nio/GridTcpNioCommunicationClient.java | 18 +-
.../ignite/internal/util/worker/GridWorker.java | 27 -
.../internal/util/worker/GridWorkerFuture.java | 20 -
.../visor/cache/VisorCacheClearTask.java | 2 +-
.../visor/node/VisorGridConfiguration.java | 6 +-
.../org/apache/ignite/lang/IgniteFuture.java | 67 +-
.../lang/IgniteFutureCancelledException.java | 3 -
.../lang/IgniteFutureTimeoutException.java | 3 -
.../communication/tcp/TcpCommunicationSpi.java | 32 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../igfs/IgfsFragmentizerAbstractSelfTest.java | 4 +-
.../internal/GridMultipleJobsSelfTest.java | 2 +-
.../GridTaskFutureImplStopGridSelfTest.java | 2 +-
.../internal/GridTaskListenerSelfTest.java | 2 +-
.../GridCacheAsyncOperationsLimitSelfTest.java | 3 +-
...dCacheAtomicUsersAffinityMapperSelfTest.java | 7 +-
.../GridCacheFinishPartitionsSelfTest.java | 6 +-
.../GridCachePartitionedLocalStoreSelfTest.java | 7 -
...chePartitionedOffHeapLocalStoreSelfTest.java | 7 -
.../cache/GridCachePutAllFailoverSelfTest.java | 6 +-
.../GridCacheReferenceCleanupSelfTest.java | 2 +-
.../GridCacheReplicatedLocalStoreSelfTest.java | 7 -
...heReplicatedUsersAffinityMapperSelfTest.java | 7 +-
...ridCacheTxPartitionedLocalStoreSelfTest.java | 7 -
.../GridCacheTxUsersAffinityMapperSelfTest.java | 7 +-
.../distributed/GridCacheEventAbstractTest.java | 17 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 28 +-
.../igfs/IgfsDualAbstractSelfTest.java | 18 +-
.../cache/GridCacheCommandHandlerSelfTest.java | 42 +-
.../util/future/GridCompoundFutureSelfTest.java | 30 +-
.../util/future/GridEmbeddedFutureSelfTest.java | 13 +-
.../util/future/GridFinishedFutureSelfTest.java | 103 --
.../util/future/GridFutureAdapterSelfTest.java | 115 +-
.../future/GridFutureListenPerformanceTest.java | 22 +-
.../util/future/IgniteFutureImplTest.java | 99 +-
.../util/future/nio/GridNioFutureSelfTest.java | 8 +-
.../lang/GridFutureListenPerformanceTest.java | 2 +-
.../loadtests/colocation/GridTestMain.java | 2 +-
...GridJobExecutionLoadTestClientSemaphore.java | 2 +-
...JobExecutionSingleNodeSemaphoreLoadTest.java | 2 +-
.../mergesort/GridMergeSortLoadTask.java | 2 +-
.../ignite/messaging/GridMessagingSelfTest.java | 12 +-
.../GridCacheStoreValueBytesTest.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 13 +-
.../testsuites/IgniteLangSelfTestSuite.java | 1 -
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 10 +-
.../processors/hadoop/igfs/HadoopIgfsEx.java | 4 +-
.../hadoop/igfs/HadoopIgfsFuture.java | 4 +-
.../hadoop/igfs/HadoopIgfsInProc.java | 9 +-
.../hadoop/igfs/HadoopIgfsInputStream.java | 6 +-
.../processors/hadoop/igfs/HadoopIgfsIo.java | 6 +-
.../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 5 +-
.../hadoop/igfs/HadoopIgfsOutProc.java | 50 +-
.../processors/hadoop/igfs/HadoopIgfsUtils.java | 3 +-
.../hadoop/jobtracker/HadoopJobTracker.java | 20 +-
.../proto/HadoopProtocolJobStatusTask.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 4 +-
.../hadoop/shuffle/HadoopShuffleJob.java | 16 +-
.../external/HadoopExternalTaskExecutor.java | 28 +-
.../child/HadoopChildProcessRunner.java | 10 +-
.../HadoopExternalCommunication.java | 7 +-
.../HadoopTcpNioCommunicationClient.java | 12 +-
.../java/org/apache/ignite/igfs/IgfsLoad.java | 549 ------
.../ignite/loadtests/igfs/IgfsNodeStartup.java | 48 -
.../igfs/IgfsPerformanceBenchmark.java | 281 ---
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 8 +-
.../cache/jta/GridCacheXAResource.java | 9 +-
.../processors/schedule/ScheduleFutureImpl.java | 205 +--
.../schedule/GridScheduleSelfTest.java | 4 +-
.../ignite/visor/commands/VisorConsole.scala | 61 +-
.../visor/commands/ack/VisorAckCommand.scala | 4 +-
.../visor/commands/gc/VisorGcCommand.scala | 2 -
.../visor/commands/ping/VisorPingCommand.scala | 2 +-
.../scala/org/apache/ignite/visor/visor.scala | 34 +-
pom.xml | 33 +
.../basic-concepts/async-support.md | 92 +
.../basic-concepts/getting-started.md | 235 +++
.../basic-concepts/ignite-life-cycel.md | 122 ++
.../documentation/basic-concepts/maven-setup.md | 85 +
.../basic-concepts/what-is-ignite.md | 48 +
.../basic-concepts/zero-deployment.md | 73 +
wiki/documentation/clustering/aws-config.md | 59 +
wiki/documentation/clustering/cluster-config.md | 193 ++
wiki/documentation/clustering/cluster-groups.md | 227 +++
wiki/documentation/clustering/cluster.md | 145 ++
.../documentation/clustering/leader-election.md | 76 +
wiki/documentation/clustering/network-config.md | 118 ++
wiki/documentation/clustering/node-local-map.md | 52 +
.../documentation/compute-grid/checkpointing.md | 255 +++
.../compute-grid/collocate-compute-and-data.md | 46 +
wiki/documentation/compute-grid/compute-grid.md | 73 +
.../documentation/compute-grid/compute-tasks.md | 122 ++
.../compute-grid/distributed-closures.md | 124 ++
.../compute-grid/executor-service.md | 40 +
.../compute-grid/fault-tolerance.md | 96 +
.../compute-grid/job-scheduling.md | 86 +
.../compute-grid/load-balancing.md | 76 +
.../data-grid/affinity-collocation.md | 95 +
.../data-grid/automatic-db-integration.md | 119 ++
wiki/documentation/data-grid/cache-modes.md | 254 +++
wiki/documentation/data-grid/cache-queries.md | 181 ++
wiki/documentation/data-grid/data-grid.md | 85 +
wiki/documentation/data-grid/data-loading.md | 94 +
wiki/documentation/data-grid/evictions.md | 103 ++
.../data-grid/hibernate-l2-cache.md | 190 ++
wiki/documentation/data-grid/jcache.md | 116 ++
wiki/documentation/data-grid/off-heap-memory.md | 197 +++
.../documentation/data-grid/persistent-store.md | 128 ++
wiki/documentation/data-grid/rebalancing.md | 122 ++
wiki/documentation/data-grid/transactions.md | 144 ++
.../data-grid/web-session-clustering.md | 253 +++
.../distributed-data-structures/atomic-types.md | 114 ++
.../countdownlatch.md | 41 +
.../distributed-data-structures/id-generator.md | 57 +
.../queue-and-set.md | 133 ++
.../distributed-events/automatic-batching.md | 33 +
wiki/documentation/distributed-events/events.md | 118 ++
.../distributed-file-system/igfs.md | 18 +
.../distributed-messaging/messaging.md | 90 +
wiki/documentation/http/configuration.md | 67 +
wiki/documentation/http/rest-api.md | 1663 ++++++++++++++++++
.../release-notes/release-notes.md | 30 +
.../service-grid/cluster-singletons.md | 111 ++
.../service-grid/service-configuration.md | 50 +
.../service-grid/service-example.md | 111 ++
wiki/documentation/service-grid/service-grid.md | 79 +
wiki/licence-prepender.sh | 51 +
351 files changed, 9001 insertions(+), 12888 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
index b6aa15c,0000000..8c29898
mode 100644,000000..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@@ -1,75 -1,0 +1,66 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
- import java.io.*;
-
+/**
+ * Data streamer future.
+ */
+class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Data streamer. */
++ /** Data loader. */
+ @GridToStringExclude
+ private IgniteDataStreamerImpl dataLdr;
+
+ /**
+ * Default constructor for {@link Externalizable} support.
+ */
+ public IgniteDataStreamerFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Context.
+ * @param dataLdr Data streamer.
+ */
- IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl
dataLdr) {
- super(ctx);
-
++ IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataLoaderImpl
dataLdr) {
+ assert dataLdr != null;
+
+ this.dataLdr = dataLdr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
+ if (onCancelled()) {
+ dataLdr.closeEx(true);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerFuture.class, this,
super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
index faba034,0000000..7867c28
mode 100644,000000..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
@@@ -1,1469 -1,0 +1,1470 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data streamer implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed {
+ /** Isolated updater. */
+ private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+ /** Cache updater. */
+ private Updater<K, V> updater = ISOLATED_UPDATER;
+
+ /** */
+ private byte[] updaterBytes;
+
+ /** Max remap count before issuing an error. */
+ private static final int DFLT_MAX_REMAP_CNT = 32;
+
+ /** Log reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new
AtomicReference<>();
+
++ /** Logger. */
++ private static IgniteLogger log;
++
+ /** Cache name ({@code null} for default cache). */
+ private final String cacheName;
+
+ /** Portable enabled flag. */
+ private final boolean portableEnabled;
+
+ /**
+ * If {@code true} then data will be transferred in compact format (only
keys and values).
+ * Otherwise full map entry will be transferred (this is requires by DR
internal logic).
+ */
+ private final boolean compact;
+
+ /** Per-node buffer size. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+ /** */
+ private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+ /** */
+ private long autoFlushFreq;
+
+ /** Mapping. */
+ @GridToStringInclude
+ private ConcurrentMap<UUID, Buffer> bufMappings = new
ConcurrentHashMap8<>();
+
- /** Logger. */
- private final IgniteLogger log;
-
+ /** Discovery listener. */
+ private final GridLocalEventListener discoLsnr;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** Communication topic for responses. */
+ private final Object topic;
+
+ /** */
+ private byte[] topicBytes;
+
- /** {@code True} if data streamer has been cancelled. */
++ /** {@code True} if data loader has been cancelled. */
+ private volatile boolean cancelled;
+
- /** Active futures of this data streamer. */
++ /** Active futures of this data loader. */
+ @GridToStringInclude
+ private final Collection<IgniteInternalFuture<?>> activeFuts = new
GridConcurrentHashSet<>();
+
+ /** Closure to remove from active futures. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ boolean rmv = activeFuts.remove(t);
+
+ assert rmv;
+ }
+ };
+
+ /** Job peer deploy aware. */
+ private volatile GridPeerDeployAware jobPda;
+
+ /** Deployment class. */
+ private Class<?> depCls;
+
+ /** Future to track loading finish. */
+ private final GridFutureAdapter<?> fut;
+
+ /** Public API future to track loading finish. */
+ private final IgniteFuture<?> publicFut;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** */
+ private volatile long lastFlushTime = U.currentTimeMillis();
+
+ /** */
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+ /** Whether a warning at {@link IgniteDataStreamerImpl#allowOverwrite()}
printed */
+ private static boolean isWarningPrinted;
+
+ /**
+ * @param ctx Grid kernal context.
+ * @param cacheName Cache name.
+ * @param flushQ Flush queue.
+ * @param compact If {@code true} data is transferred in compact mode
(only keys and values).
+ * Otherwise full map entry will be transferred (this is
required by DR internal logic).
+ */
+ public IgniteDataStreamerImpl(
+ final GridKernalContext ctx,
+ @Nullable final String cacheName,
+ DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
+ boolean compact
+ ) {
+ assert ctx != null;
+
+ this.ctx = ctx;
+ this.cacheName = cacheName;
+ this.flushQ = flushQ;
+ this.compact = compact;
+
- log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
++ if (log == null)
++ log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
+
+ ClusterNode node =
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IllegalStateException("Cache doesn't exist: " +
cacheName);
+
+ portableEnabled = ctx.portable().portableEnabled(node, cacheName);
+
+ discoLsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID id = discoEvt.eventNode().id();
+
+ // Remap regular mappings.
+ final Buffer buf = bufMappings.remove(id);
+
+ if (buf != null) {
+ // Only async notification is possible since
+ // discovery thread may be trapped otherwise.
+ ctx.closure().callLocalSafe(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ buf.onNodeLeft();
+
+ return null;
+ }
+ },
+ true /* system pool */
+ );
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED,
EVT_NODE_LEFT);
+
+ // Generate unique topic for this loader.
+ topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+ ctx.io().addMessageListener(topic, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof GridDataLoadResponse;
+
+ GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf != null)
+ buf.onResponse(res);
+
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring response since node has left
[nodeId=" + nodeId + ", ");
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug("Added response listener within topic: " + topic);
+
+ fut = new IgniteDataStreamerFuture(ctx, this);
+
+ publicFut = new IgniteFutureImpl<>(fut);
+ }
+
+ /**
+ * Enters busy lock.
+ */
+ private void enterBusy() {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Data streamer has been closed.");
+ }
+
+ /**
+ * Leaves busy lock.
+ */
+ private void leaveBusy() {
+ busyLock.leaveBusy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> future() {
+ return publicFut;
+ }
+
+ /**
+ * @return Internal future.
+ */
+ public IgniteInternalFuture<?> internalFuture() {
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deployClass(Class<?> depCls) {
+ this.depCls = depCls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updater(Updater<K, V> updater) {
+ A.notNull(updater, "updater");
+
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowOverwrite() {
+ boolean allow = updater != ISOLATED_UPDATER;
+
+ if (!allow && !isWarningPrinted) {
+ synchronized (this) {
+ if (!isWarningPrinted) {
+ log.warning("Data streamer will not overwrite existing
cache entries for better performance " +
+ "(to change, set allowOverwrite to true)");
+
+ isWarningPrinted = true;
+ }
+ }
+ }
+
+ return allow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void allowOverwrite(boolean allow) {
+ if (allow == allowOverwrite())
+ return;
+
+ ClusterNode node =
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IgniteException("Failed to get node for cache: " +
cacheName);
+
+ updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual()
: ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipStore(boolean skipStore) {
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeBufferSize() {
+ return bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeBufferSize(int bufSize) {
+ A.ensure(bufSize > 0, "bufSize > 0");
+
+ this.bufSize = bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeParallelStreamOperations() {
+ return parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeParallelStreamOperations(int parallelOps) {
+ this.parallelOps = parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long autoFlushFrequency() {
+ return autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void autoFlushFrequency(long autoFlushFreq) {
+ A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+ long old = this.autoFlushFreq;
+
+ if (autoFlushFreq != old) {
+ this.autoFlushFreq = autoFlushFreq;
+
+ if (autoFlushFreq != 0 && old == 0)
+ flushQ.add(this);
+ else if (autoFlushFreq == 0)
+ flushQ.remove(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map<K, V> entries) throws
IllegalStateException {
+ A.notNull(entries, "entries");
+
+ return addData(entries.entrySet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Collection<? extends
Map.Entry<K, V>> entries) {
+ A.notEmpty(entries, "entries");
+
+ enterBusy();
+
+ try {
- GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
++ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+
- resFut.listenAsync(rmvActiveFut);
++ resFut.listen(rmvActiveFut);
+
+ activeFuts.add(resFut);
+
+ Collection<K> keys = null;
+
+ if (entries.size() > 1) {
+ keys = new GridConcurrentHashSet<>(entries.size(),
U.capacity(entries.size()), 1);
+
+ for (Map.Entry<K, V> entry : entries)
+ keys.add(entry.getKey());
+ }
+
+ load0(entries, resFut, keys, 0);
+
+ return new IgniteFutureImpl<>(resFut);
+ }
+ catch (IgniteException e) {
- return new IgniteFinishedFutureImpl<>(ctx, e);
++ return new IgniteFinishedFutureImpl<>(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+ A.notNull(entry, "entry");
+
+ return addData(F.asList(entry));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(K key, V val) {
+ A.notNull(key, "key");
+
+ return addData(new Entry0<>(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> removeData(K key) {
+ return addData(key, null);
+ }
+
+ /**
+ * @param entries Entries.
+ * @param resFut Result future.
+ * @param activeKeys Active keys.
+ * @param remaps Remaps count.
+ */
+ private void load0(
+ Collection<? extends Map.Entry<K, V>> entries,
+ final GridFutureAdapter<Object> resFut,
+ @Nullable final Collection<K> activeKeys,
+ final int remaps
+ ) {
+ assert entries != null;
+
+ Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new
HashMap<>();
+
+ boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ List<ClusterNode> nodes;
+
+ try {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ if (initPda) {
+ jobPda = new DataStreamerPda(key, entry.getValue(),
updater);
+
+ initPda = false;
+ }
+
+ nodes = nodes(key);
+ }
+ catch (IgniteCheckedException e) {
+ resFut.onDone(e);
+
+ return;
+ }
+
+ if (F.isEmpty(nodes)) {
+ resFut.onDone(new ClusterTopologyException("Failed to map key
to node " +
+ "(no nodes with cache found in topology) [infos=" +
entries.size() +
+ ", cacheName=" + cacheName + ']'));
+
+ return;
+ }
+
+ for (ClusterNode node : nodes) {
+ Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+ if (col == null)
+ mappings.put(node, col = new ArrayList<>());
+
+ col.add(entry);
+ }
+ }
+
+ for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e :
mappings.entrySet()) {
+ final UUID nodeId = e.getKey().id();
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf == null) {
+ Buffer old = bufMappings.putIfAbsent(nodeId, buf = new
Buffer(e.getKey()));
+
+ if (old != null)
+ buf = old;
+ }
+
+ final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ try {
+ t.get();
+
+ if (activeKeys != null) {
+ for (Map.Entry<K, V> e : entriesForNode)
+ activeKeys.remove(e.getKey());
+
+ if (activeKeys.isEmpty())
+ resFut.onDone();
+ }
+ else {
+ assert entriesForNode.size() == 1;
+
+ // That has been a single key,
+ // so complete result future right away.
+ resFut.onDone();
+ }
+ }
+ catch (IgniteCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with error [nodeId=" +
nodeId + ", err=" + e1 + ']');
+
+ if (cancelled) {
+ resFut.onDone(new IgniteCheckedException("Data
streamer has been cancelled: " +
+ IgniteDataStreamerImpl.this, e1));
+ }
+ else if (remaps + 1 > maxRemapCnt) {
+ resFut.onDone(new IgniteCheckedException("Failed
to finish operation (too many remaps): "
+ + remaps), e1);
+ }
+ else
+ load0(entriesForNode, resFut, activeKeys, remaps
+ 1);
+ }
+ }
+ };
+
+ GridFutureAdapter<?> f;
+
+ try {
+ f = buf.update(entriesForNode, lsnr);
+ }
+ catch (IgniteInterruptedCheckedException e1) {
+ resFut.onDone(e1);
+
+ return;
+ }
+
+ if (ctx.discovery().node(nodeId) == null) {
+ if (bufMappings.remove(nodeId, buf))
+ buf.onNodeLeft();
+
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to
wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }
+ }
+
+ /**
+ * @param key Key to map.
+ * @return Nodes to send requests to.
+ * @throws IgniteCheckedException If failed.
+ */
+ private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+ GridAffinityProcessor aff = ctx.affinity();
+
+ return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName,
key) :
+ Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+ }
+
+ /**
+ * Performs flush.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doFlush() throws IgniteCheckedException {
+ lastFlushTime = U.currentTimeMillis();
+
+ List<IgniteInternalFuture> activeFuts0 = null;
+
+ int doneCnt = 0;
+
+ for (IgniteInternalFuture<?> f : activeFuts) {
+ if (!f.isDone()) {
+ if (activeFuts0 == null)
+ activeFuts0 = new ArrayList<>((int)(activeFuts.size() *
1.2));
+
+ activeFuts0.add(f);
+ }
+ else {
+ f.get();
+
+ doneCnt++;
+ }
+ }
+
+ if (activeFuts0 == null || activeFuts0.isEmpty())
+ return;
+
+ while (true) {
+ Queue<IgniteInternalFuture<?>> q = null;
+
+ for (Buffer buf : bufMappings.values()) {
+ IgniteInternalFuture<?> flushFut = buf.flush();
+
+ if (flushFut != null) {
+ if (q == null)
+ q = new ArrayDeque<>(bufMappings.size() * 2);
+
+ q.add(flushFut);
+ }
+ }
+
+ if (q != null) {
+ assert !q.isEmpty();
+
+ boolean err = false;
+
+ for (IgniteInternalFuture fut = q.poll(); fut != null; fut =
q.poll()) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ err = true;
+ }
+ }
+
+ if (err)
+ // Remaps needed - flush buffers.
+ continue;
+ }
+
+ doneCnt = 0;
+
+ for (int i = 0; i < activeFuts0.size(); i++) {
+ IgniteInternalFuture f = activeFuts0.get(i);
+
+ if (f == null)
+ doneCnt++;
+ else if (f.isDone()) {
+ f.get();
+
+ doneCnt++;
+
+ activeFuts0.set(i, null);
+ }
+ else
+ break;
+ }
+
+ if (doneCnt == activeFuts0.size())
+ return;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void flush() throws IgniteException {
+ enterBusy();
+
+ try {
+ doFlush();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Flushes every internal buffer if buffer was flushed before passed in
+ * threshold.
+ * <p>
+ * Does not wait for result and does not fail on errors assuming that
this method
+ * should be called periodically.
+ */
+ @Override public void tryFlush() throws IgniteInterruptedException {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ for (Buffer buf : bufMappings.values())
+ buf.flush();
+
+ lastFlushTime = U.currentTimeMillis();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteException If failed.
+ */
+ @Override public void close(boolean cancel) throws IgniteException {
+ try {
+ closeEx(cancel);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel) throws IgniteCheckedException {
+ if (!closed.compareAndSet(false, true))
+ return;
+
+ busyLock.block();
+
+ if (log.isDebugEnabled())
+ log.debug("Closing data streamer [ldr=" + this + ", cancel=" +
cancel + ']');
+
+ IgniteCheckedException e = null;
+
+ try {
+ // Assuming that no methods are called on this loader after this
method is called.
+ if (cancel) {
+ cancelled = true;
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll();
+ }
+ else
+ doFlush();
+
+ ctx.event().removeLocalEventListener(discoLsnr);
+
+ ctx.io().removeMessageListener(topic);
+ }
+ catch (IgniteCheckedException e0) {
+ e = e0;
+ }
+
+ fut.onDone(null, e);
+
+ if (e != null)
+ throw e;
+ }
+
+ /**
+ * @return {@code true} If the loader is closed.
+ */
+ boolean isClosed() {
+ return fut.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ close(false);
+ }
+
+ /**
+ * @return Max remap count.
+ */
+ public int maxRemapCount() {
+ return maxRemapCnt;
+ }
+
+ /**
+ * @param maxRemapCnt New max remap count.
+ */
+ public void maxRemapCount(int maxRemapCnt) {
+ this.maxRemapCnt = maxRemapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerImpl.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDelay(TimeUnit unit) {
+ return unit.convert(nextFlushTime() - U.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Next flush time.
+ */
+ private long nextFlushTime() {
+ return lastFlushTime + autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Delayed o) {
+ return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime()
? 1 : -1;
+ }
+
+ /**
+ *
+ */
+ private class Buffer {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Active futures. */
+ private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+ /** Buffered entries. */
+ private List<Map.Entry<K, V>> entries;
+
+ /** */
+ @GridToStringExclude
+ private GridFutureAdapter<Object> curFut;
+
+ /** Local node flag. */
+ private final boolean isLocNode;
+
+ /** ID generator. */
+ private final AtomicLong idGen = new AtomicLong();
+
+ /** Active futures. */
+ private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+ /** */
+ private final Semaphore sem;
+
+ /** Closure to signal on task finish. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
+
+ /**
+ * @param node Node.
+ */
+ Buffer(ClusterNode node) {
+ assert node != null;
+
+ this.node = node;
+
+ locFuts = new GridConcurrentHashSet<>();
+ reqs = new ConcurrentHashMap8<>();
+
+ // Cache local node flag.
+ isLocNode = node.equals(ctx.discovery().localNode());
+
+ entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
++ curFut = new GridFutureAdapter<>();
++ curFut.listen(signalC);
+
+ sem = new Semaphore(parallelOps);
+ }
+
+ /**
+ * @param newEntries Infos.
+ * @param lsnr Listener for the operation future.
+ * @throws IgniteInterruptedCheckedException If failed.
+ * @return Future for operation.
+ */
+ @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>>
newEntries,
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws
IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+
- curFut0.listenAsync(lsnr);
++ curFut0.listen(lsnr);
+
+ for (Map.Entry<K, V> entry : newEntries)
+ entries.add(entry);
+
+ if (entries.size() >= bufSize) {
+ entries0 = entries;
+
+ entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
++ curFut = new GridFutureAdapter<>();
++ curFut.listen(signalC);
+ }
+ }
+
+ if (entries0 != null) {
+ submit(entries0, curFut0);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data streamer
has been cancelled: " + IgniteDataStreamerImpl.this));
+ }
+
+ return curFut0;
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<Map.Entry<K, V>> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /**
+ * @return Future if any submitted.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been
interrupted.
+ */
+ @Nullable IgniteInternalFuture<?> flush() throws
IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0 = null;
+
+ synchronized (this) {
+ if (!entries.isEmpty()) {
+ entries0 = entries;
+ curFut0 = curFut;
+
+ entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
++ curFut = new GridFutureAdapter<>();
++ curFut.listen(signalC);
+ }
+ }
+
+ if (entries0 != null)
+ submit(entries0, curFut0);
+
+ // Create compound future for this flush.
+ GridCompoundFuture<Object, Object> res = null;
+
+ for (IgniteInternalFuture<Object> f : locFuts) {
+ if (res == null)
- res = new GridCompoundFuture<>(ctx);
++ res = new GridCompoundFuture<>();
+
+ res.add(f);
+ }
+
+ for (IgniteInternalFuture<Object> f : reqs.values()) {
+ if (res == null)
- res = new GridCompoundFuture<>(ctx);
++ res = new GridCompoundFuture<>();
+
+ res.add(f);
+ }
+
+ if (res != null)
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Increments active tasks count.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been
interrupted.
+ */
+ private void incrementActiveTasks() throws
IgniteInterruptedCheckedException {
+ U.acquire(sem);
+ }
+
+ /**
+ * @param f Future that finished.
+ */
+ private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+ assert f != null;
+
+ sem.release();
+ }
+
+ /**
+ * @param entries Entries to submit.
+ * @param curFut Current future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void submit(final Collection<Map.Entry<K, V>> entries, final
GridFutureAdapter<Object> curFut)
+ throws IgniteInterruptedCheckedException {
+ assert entries != null;
+ assert !entries.isEmpty();
+ assert curFut != null;
+
+ incrementActiveTasks();
+
+ IgniteInternalFuture<Object> fut;
+
+ if (isLocNode) {
+ fut = ctx.closure().callLocalSafe(
+ new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName,
entries, false, skipStore, updater), false);
+
+ locFuts.add(fut);
+
- fut.listenAsync(new
IgniteInClosure<IgniteInternalFuture<Object>>() {
++ fut.listen(new
IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object>
t) {
+ try {
+ boolean rmv = locFuts.remove(t);
+
+ assert rmv;
+
+ curFut.onDone(t.get());
+ }
+ catch (IgniteCheckedException e) {
+ curFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ byte[] entriesBytes;
+
+ try {
+ if (compact) {
+ entriesBytes = ctx.config().getMarshaller()
+ .marshal(new Entries0<>(entries, portableEnabled
? ctx.portable() : null));
+ }
+ else
+ entriesBytes =
ctx.config().getMarshaller().marshal(entries);
+
+ if (updaterBytes == null) {
+ assert updater != null;
+
+ updaterBytes =
ctx.config().getMarshaller().marshal(updater);
+ }
+
+ if (topicBytes == null)
+ topicBytes =
ctx.config().getMarshaller().marshal(topic);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal (request will not be
sent).", e);
+
+ return;
+ }
+
+ GridDeployment dep = null;
+ GridPeerDeployAware jobPda0 = null;
+
+ if (ctx.deploy().enabled()) {
+ try {
+ jobPda0 = jobPda;
+
+ assert jobPda0 != null;
+
+ dep = ctx.deploy().deploy(jobPda0.deployClass(),
jobPda0.classLoader());
+
+ GridCacheAdapter<Object, Object> cache =
ctx.cache().internalCache(cacheName);
+
+ if (cache != null)
+ cache.context().deploy().onEnter();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to deploy class (request will
not be sent): " + jobPda0.deployClass(), e);
+
+ return;
+ }
+
+ if (dep == null)
+ U.warn(log, "Failed to deploy class (request will be
sent): " + jobPda0.deployClass());
+ }
+
+ long reqId = idGen.incrementAndGet();
+
+ fut = curFut;
+
+ reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+ GridDataLoadRequest req = new GridDataLoadRequest(
+ reqId,
+ topicBytes,
+ cacheName,
+ updaterBytes,
+ entriesBytes,
+ true,
+ skipStore,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? jobPda0.deployClass().getName() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null,
+ dep != null ? dep.classLoaderId() : null,
+ dep == null);
+
+ try {
+ ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent request to node [nodeId=" + node.id()
+ ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(node) &&
ctx.discovery().pingNode(node.id()))
+ ((GridFutureAdapter<Object>)fut).onDone(e);
+ else
+ ((GridFutureAdapter<Object>)fut).onDone(new
ClusterTopologyCheckedException("Failed to send " +
+ "request (node has left): " + node.id()));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void onNodeLeft() {
+ assert !isLocNode;
+ assert bufMappings.get(node.id()) != this;
+
+ if (log.isDebugEnabled())
+ log.debug("Forcibly completing futures (node has left): " +
node.id());
+
+ Exception e = new ClusterTopologyCheckedException("Failed to wait
for request completion " +
+ "(node has left): " + node.id());
+
+ for (GridFutureAdapter<Object> f : reqs.values())
+ f.onDone(e);
+
+ // Make sure to complete current future.
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+ }
+
+ curFut0.onDone(e);
+ }
+
+ /**
+ * @param res Response.
+ */
+ void onResponse(GridDataLoadResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+ if (f == null) {
+ if (log.isDebugEnabled())
+ log.debug("Future for request has not been found: " +
res.requestId());
+
+ return;
+ }
+
+ Throwable err = null;
+
+ byte[] errBytes = res.errorBytes();
+
+ if (errBytes != null) {
+ try {
+ GridPeerDeployAware jobPda0 = jobPda;
+
+ err = ctx.config().getMarshaller().unmarshal(
+ errBytes,
+ jobPda0 != null ? jobPda0.classLoader() :
U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ f.onDone(null, new IgniteCheckedException("Failed to
unmarshal response.", e));
+
+ return;
+ }
+ }
+
+ f.onDone(null, err);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished future [fut=" + f + ", reqId=" +
res.requestId() + ", err=" + err + ']');
+ }
+
+ /**
+ *
+ */
+ void cancelAll() {
+ IgniteCheckedException err = new IgniteCheckedException("Data
streamer has been cancelled: " + IgniteDataStreamerImpl.this);
+
+ for (IgniteInternalFuture<?> f : locFuts) {
+ try {
+ f.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to cancel mini-future.", e);
+ }
+ }
+
+ for (GridFutureAdapter<?> f : reqs.values())
+ f.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ int size;
+
+ synchronized (this) {
+ size = entries.size();
+ }
+
+ return S.toString(Buffer.class, this,
+ "entriesCnt", size,
+ "locFutsSize", locFuts.size(),
+ "reqsSize", reqs.size());
+ }
+ }
+
+ /**
+ * Data streamer peer-deploy aware.
+ */
+ private class DataStreamerPda implements GridPeerDeployAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deploy class. */
+ private Class<?> cls;
+
+ /** Class loader. */
+ private ClassLoader ldr;
+
+ /** Collection of objects to detect deploy class and class loader. */
+ private Collection<Object> objs;
+
+ /**
+ * Constructs data streamer peer-deploy aware.
+ *
+ * @param objs Collection of objects to detect deploy class and class
loader.
+ */
+ private DataStreamerPda(Object... objs) {
+ this.objs = Arrays.asList(objs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ if (cls == null) {
+ Class<?> cls0 = null;
+
+ if (depCls != null)
+ cls0 = depCls;
+ else {
+ for (Iterator<Object> it = objs.iterator(); (cls0 == null
|| U.isJdk(cls0)) && it.hasNext();) {
+ Object o = it.next();
+
+ if (o != null)
+ cls0 = U.detectClass(o);
+ }
+
+ if (cls0 == null || U.isJdk(cls0))
+ cls0 = IgniteDataStreamerImpl.class;
+ }
+
+ assert cls0 != null : "Failed to detect deploy class [objs="
+ objs + ']';
+
+ cls = cls0;
+ }
+
+ return cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ if (ldr == null) {
+ ClassLoader ldr0 = deployClass().getClassLoader();
+
+ // Safety.
+ if (ldr0 == null)
+ ldr0 = U.gridClassLoader();
+
+ assert ldr0 != null : "Failed to detect classloader [objs=" +
objs + ']';
+
+ ldr = ldr0;
+ }
+
+ return ldr;
+ }
+ }
+
+ /**
+ * Entry.
+ */
+ private static class Entry0<K, V> implements Map.Entry<K, V>,
Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private K key;
+
+ /** */
+ private V val;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ private Entry0(K key, @Nullable V val) {
+ assert key != null;
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Entry0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public K getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V setValue(V val) {
+ V old = this.val;
+
+ this.val = val;
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ out.writeObject(key);
+ out.writeObject(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws
IOException, ClassNotFoundException {
+ key = (K)in.readObject();
+ val = (V)in.readObject();
+ }
+ }
+
+ /**
+ * Wrapper list with special compact serialization of map entries.
+ */
+ private static class Entries0<K, V> extends
AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Wrapped delegate. */
+ private Collection<Map.Entry<K, V>> delegate;
+
+ /** Optional portable processor for converting values. */
+ private GridPortableProcessor portable;
+
+ /**
+ * @param delegate Delegate.
+ * @param portable Portable processor.
+ */
+ private Entries0(Collection<Map.Entry<K, V>> delegate,
GridPortableProcessor portable) {
+ this.delegate = delegate;
+ this.portable = portable;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public Entries0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Entry<K, V>> iterator() {
+ return delegate.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return delegate.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ out.writeInt(delegate.size());
+
+ boolean portableEnabled = portable != null;
+
+ for (Map.Entry<K, V> entry : delegate) {
+ if (portableEnabled) {
+
out.writeObject(portable.marshalToPortable(entry.getKey()));
+
out.writeObject(portable.marshalToPortable(entry.getValue()));
+ }
+ else {
+ out.writeObject(entry.getKey());
+ out.writeObject(entry.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws
IOException, ClassNotFoundException {
+ int sz = in.readInt();
+
+ delegate = new ArrayList<>(sz);
+
+ for (int i = 0; i < sz; i++) {
+ Object k = in.readObject();
+ Object v = in.readObject();
+
+ delegate.add(new Entry0<>((K)k, (V)v));
+ }
+ }
+ }
+
+ /**
+ * Isolated updater which only loads entry initial value.
+ */
+ private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache,
Collection<Map.Entry<K, V>> entries) {
+ IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+ GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+ if (internalCache.isNear())
+ internalCache = internalCache.context().near().dht();
+
+ GridCacheContext<K, V> cctx = internalCache.context();
+
+ long topVer = cctx.affinity().affinityTopologyVersion();
+
+ GridCacheVersion ver = cctx.versions().next(topVer);
+
+ boolean portable = cctx.portableEnabled();
+
+ for (Map.Entry<K, V> e : entries) {
+ try {
+ K key = e.getKey();
+ V val = e.getValue();
+
+ if (portable) {
+ key = (K)cctx.marshalToPortable(key);
+ val = (V)cctx.marshalToPortable(val);
+ }
+
+ GridCacheEntryEx<K, V> entry = internalCache.entryEx(key,
topVer);
+
+ entry.unswap(true, false);
+
+ entry.initialValue(val, null, ver, CU.TTL_ETERNAL,
CU.EXPIRE_TIME_ETERNAL, false, topVer,
+ GridDrType.DR_LOAD);
+
+ cctx.evicts().touch(entry, topVer);
+ }
+ catch (GridDhtInvalidPartitionException |
GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ catch (IgniteCheckedException ex) {
+ IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+ U.error(log, "Failed to set initial value for cache
entry: " + e, ex);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
index 9dd4a8e,0000000..2934153
mode 100644,000000..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
@@@ -1,316 -1,0 +1,316 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
+ /** Loaders map (access is not supposed to be highly concurrent). */
+ private Collection<IgniteDataStreamerImpl> ldrs = new
GridConcurrentHashSet<>();
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Flushing thread. */
+ private Thread flusher;
+
+ /** */
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new
DelayQueue<>();
+
+ /** Marshaller. */
+ private final Marshaller marsh;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public IgniteDataStreamerProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener()
{
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof GridDataLoadRequest;
+
+ processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
+ }
+ });
+
+ marsh = ctx.config().getMarshaller();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
+
+ flusher = new IgniteThread(new GridWorker(ctx.gridName(),
"grid-data-loader-flusher", log) {
+ @Override protected void body() throws InterruptedException {
+ while (!isCancelled()) {
+ IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (ldr.isClosed())
+ continue;
+
+ ldr.tryFlush();
+
+ flushQ.offer(ldr);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ }
+ });
+
+ flusher.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Started data streamer processor.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (ctx.config().isDaemon())
+ return;
+
+ ctx.io().removeMessageListener(TOPIC_DATALOAD);
+
+ busyLock.block();
+
+ U.interrupt(flusher);
+ U.join(flusher, log);
+
+ for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
+ if (log.isDebugEnabled())
+ log.debug("Closing active data streamer on grid stop [ldr=" +
ldr + ", cancel=" + cancel + ']');
+
+ try {
+ ldr.closeEx(cancel);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Interrupted while waiting for completion of the
data streamer: " + ldr, e);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close data streamer: " + ldr, e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Stopped data streamer processor.");
+ }
+
+ /**
+ * @param cacheName Cache name ({@code null} for default cache).
+ * @param compact {@code true} if data streamer should transfer data in
compact format.
+ * @return Data streamer.
+ */
+ public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String
cacheName, boolean compact) {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to create data streamer
(grid is stopping).");
+
+ try {
+ final IgniteDataStreamerImpl<K, V> ldr = new
IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
+
+ ldrs.add(ldr);
+
- ldr.internalFuture().listenAsync(new
CI1<IgniteInternalFuture<?>>() {
++ ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ boolean b = ldrs.remove(ldr);
+
+ assert b : "Loader has not been added to set: " + ldr;
+
+ if (log.isDebugEnabled())
+ log.debug("Loader has been completed: " + ldr);
+ }
+ });
+
+ return ldr;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param cacheName Cache name ({@code null} for default cache).
+ * @return Data streamer.
+ */
+ public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
+ return dataStreamer(cacheName, true);
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param req Request.
+ */
+ private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req)
{
+ if (!busyLock.enterBusy()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring data load request (node is stopping): " +
req);
+
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Processing data load request: " + req);
+
+ Object topic;
+
+ try {
+ topic = marsh.unmarshal(req.responseTopicBytes(), null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal topic from request: " +
req, e);
+
+ return;
+ }
+
+ ClassLoader clsLdr;
+
+ if (req.forceLocalDeployment())
+ clsLdr = U.gridClassLoader();
+ else {
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(
+ req.deploymentMode(),
+ req.sampleClassName(),
+ req.sampleClassName(),
+ req.userVersion(),
+ nodeId,
+ req.classLoaderId(),
+ req.participants(),
+ null);
+
+ if (dep == null) {
+ sendResponse(nodeId,
+ topic,
+ req.requestId(),
+ new IgniteCheckedException("Failed to get deployment
for request [sndId=" + nodeId +
+ ", req=" + req + ']'),
+ false);
+
+ return;
+ }
+
+ clsLdr = dep.classLoader();
+ }
+
+ Collection<Map.Entry<K, V>> col;
+ IgniteDataStreamer.Updater<K, V> updater;
+
+ try {
+ col = marsh.unmarshal(req.collectionBytes(), clsLdr);
+ updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal message [nodeId=" + nodeId
+ ", req=" + req + ']', e);
+
+ sendResponse(nodeId, topic, req.requestId(), e, false);
+
+ return;
+ }
+
+ IgniteDataStreamerUpdateJob<K, V> job = new
IgniteDataStreamerUpdateJob<>(ctx,
+ log,
+ req.cacheName(),
+ col,
+ req.ignoreDeploymentOwnership(),
+ req.skipStore(),
+ updater);
+
+ Exception err = null;
+
+ try {
+ job.call();
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to finish update job.", e);
+
+ err = e;
+ }
+
+ sendResponse(nodeId, topic, req.requestId(), err,
req.forceLocalDeployment());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param resTopic Response topic.
+ * @param reqId Request ID.
+ * @param err Error.
+ * @param forceLocDep Force local deployment.
+ */
+ private void sendResponse(UUID nodeId, Object resTopic, long reqId,
@Nullable Throwable err,
+ boolean forceLocDep) {
+ byte[] errBytes;
+
+ try {
+ errBytes = err != null ? marsh.marshal(err) : null;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal message.", e);
+
+ return;
+ }
+
+ GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes,
forceLocDep);
+
+ try {
+ ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(nodeId))
+ U.error(log, "Failed to respond to node [nodeId=" + nodeId +
", res=" + res + ']', e);
+ else if (log.isDebugEnabled())
+ log.debug("Node has left the grid: " + nodeId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> Data streamer processor memory stats [grid=" +
ctx.gridName() + ']');
+ X.println(">>> ldrsSize: " + ldrs.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de65eb0/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
----------------------------------------------------------------------