Merge branches 'ignite-471' and 'sprint-2' of 
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-471

Conflicts:
        
modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
        
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
        
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2f6f8cdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2f6f8cdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2f6f8cdf

Branch: refs/heads/ignite-471
Commit: 2f6f8cdf4bd3e182bb6e7153487ebb45185de603
Parents: f28b750 d8c0766
Author: Valentin Kulichenko <[email protected]>
Authored: Thu Mar 12 10:59:03 2015 -0700
Committer: Valentin Kulichenko <[email protected]>
Committed: Thu Mar 12 10:59:03 2015 -0700

----------------------------------------------------------------------
 DEVNOTES.txt                                    |   13 +-
 LICENSE.txt                                     |    2 +-
 NOTICE.txt                                      |    0
 RELEASE_NOTES.txt                               |    0
 assembly/dependencies-optional-fabric.xml       |    2 +-
 assembly/dependencies-optional-hadoop.xml       |    2 +-
 assembly/dependencies-schema-import.xml         |   56 +
 assembly/dependencies-schema-load.xml           |   56 -
 assembly/release-hadoop.xml                     |   16 +-
 bin/ignite-schema-import.bat                    |  116 ++
 bin/ignite-schema-import.sh                     |   78 +
 bin/ignite-schema-load.bat                      |  116 --
 bin/ignite-schema-load.sh                       |   78 -
 config/hadoop/default-config.xml                |   10 +-
 docs/core-site.ignite.xml                       |   90 -
 docs/hadoop_readme.md                           |  134 --
 docs/hadoop_readme.pdf                          |  Bin 82297 -> 0 bytes
 docs/hive-site.ignite.xml                       |   37 -
 docs/mapred-site.ignite.xml                     |   66 -
 examples/config/example-cache.xml               |    4 +-
 examples/config/filesystem/example-igfs.xml     |   13 +-
 examples/config/store/example-database.script   |   27 +
 .../config/store/example-jdbc-pojo-store.xml    |  142 ++
 .../datagrid/CacheDataLoaderExample.java        |   85 -
 .../datagrid/CacheDataStreamerExample.java      |   85 +
 .../datagrid/CachePopularNumbersExample.java    |   14 +-
 .../store/CacheNodeWithStoreStartup.java        |   33 +
 .../datagrid/store/CacheStoreExample.java       |    1 +
 .../store/CacheStoreLoadDataExample.java        |   10 +-
 .../ignite/examples/datagrid/store/Person.java  |  103 --
 .../store/dummy/CacheDummyPersonStore.java      |    2 +-
 .../hibernate/CacheHibernatePersonStore.java    |    2 +-
 .../datagrid/store/hibernate/Person.hbm.xml     |    2 +-
 .../datagrid/store/hibernate/hibernate.cfg.xml  |    3 -
 .../store/jdbc/CacheJdbcPersonStore.java        |    4 +-
 .../store/jdbc/CacheJdbcPojoPersonStore.java    |   96 ++
 .../examples/datagrid/store/model/Person.java   |  155 ++
 .../datagrid/store/model/PersonKey.java         |   97 ++
 .../MemcacheRestExampleNodeStartup.java         |    5 +-
 .../ScalarCachePopularNumbersExample.scala      |   49 +-
 .../ignite/examples/CacheExamplesSelfTest.java  |    6 +-
 .../clients/src/test/resources/spring-cache.xml |    8 +-
 .../ignite/codegen/MessageCodeGenerator.java    |    1 +
 modules/core/licenses/snaptree-bsd-license.txt  |   35 +
 modules/core/licenses/snaptree-license.txt      |   35 -
 modules/core/licenses/sun-bcl-license.txt       |   50 -
 .../src/main/java/org/apache/ignite/Ignite.java |   10 +-
 .../java/org/apache/ignite/IgniteCache.java     |   24 +
 .../org/apache/ignite/IgniteDataLoader.java     |  379 ----
 .../org/apache/ignite/IgniteDataStreamer.java   |  401 +++++
 .../apache/ignite/cache/CachePreloadMode.java   |   67 -
 .../apache/ignite/cache/CacheRebalanceMode.java |   67 +
 .../java/org/apache/ignite/cache/GridCache.java |   12 +-
 .../store/jdbc/CacheAbstractJdbcStore.java      |   33 +-
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |   52 +-
 .../apache/ignite/cluster/ClusterMetrics.java   |   66 +-
 .../configuration/CacheConfiguration.java       |  231 ++-
 .../configuration/FileSystemConfiguration.java  |   55 +-
 .../ignite/events/CachePreloadingEvent.java     |  172 --
 .../ignite/events/CacheRebalancingEvent.java    |  172 ++
 .../java/org/apache/ignite/events/Event.java    |    2 +-
 .../org/apache/ignite/events/EventType.java     |   44 +-
 .../igfs/IgfsIpcEndpointConfiguration.java      |  241 +++
 .../apache/ignite/igfs/IgfsIpcEndpointType.java |   29 +
 .../ClusterLocalNodeMetricsMXBeanImpl.java      |    5 +
 .../ignite/internal/ClusterMetricsSnapshot.java |   26 +-
 .../ignite/internal/GridKernalContext.java      |    8 +-
 .../ignite/internal/GridKernalContextImpl.java  |   12 +-
 .../org/apache/ignite/internal/GridTopic.java   |    2 +-
 .../apache/ignite/internal/IgniteKernal.java    |   16 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |    8 +-
 .../communication/GridIoMessageFactory.java     |    8 +-
 .../processors/cache/GridCacheAdapter.java      |   22 +-
 .../processors/cache/GridCacheAttributes.java   |   32 +-
 .../processors/cache/GridCacheContext.java      |    8 +-
 .../processors/cache/GridCacheEventManager.java |   12 +-
 .../cache/GridCacheEvictionManager.java         |    6 +-
 .../processors/cache/GridCacheMapEntry.java     |    2 +-
 .../GridCachePartitionExchangeManager.java      |    6 +-
 .../processors/cache/GridCacheProcessor.java    |   74 +-
 .../processors/cache/GridCacheUtils.java        |    4 +-
 .../processors/cache/IgniteCacheProxy.java      |    7 +
 .../GridDistributedCacheAdapter.java            |   10 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    4 +-
 .../distributed/dht/GridDhtLockFuture.java      |    4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   14 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   10 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   30 +-
 .../preloader/GridDhtPartitionDemandPool.java   |   78 +-
 .../preloader/GridDhtPartitionSupplyPool.java   |   22 +-
 .../dht/preloader/GridDhtPreloader.java         |   14 +-
 .../transactions/IgniteTxLocalAdapter.java      |    4 +-
 .../version/GridCacheRawVersionedEntry.java     |    4 +-
 .../dataload/GridDataLoadCacheUpdaters.java     |  206 ---
 .../dataload/GridDataLoadRequest.java           |  451 -----
 .../dataload/GridDataLoadResponse.java          |  166 --
 .../dataload/GridDataLoadUpdateJob.java         |  150 --
 .../dataload/GridDataLoaderFuture.java          |   62 -
 .../dataload/GridDataLoaderProcessor.java       |  308 ----
 .../dataload/IgniteDataLoaderEntry.java         |  170 --
 .../dataload/IgniteDataLoaderImpl.java          | 1392 ---------------
 .../internal/processors/dataload/package.html   |   24 -
 .../datastreamer/DataStreamProcessor.java       |  308 ++++
 .../datastreamer/DataStreamerCacheUpdaters.java |  206 +++
 .../datastreamer/DataStreamerEntry.java         |  170 ++
 .../datastreamer/DataStreamerFuture.java        |   60 +
 .../datastreamer/DataStreamerImpl.java          | 1404 +++++++++++++++
 .../datastreamer/DataStreamerRequest.java       |  451 +++++
 .../datastreamer/DataStreamerResponse.java      |  166 ++
 .../datastreamer/DataStreamerUpdateJob.java     |  150 ++
 .../processors/datastreamer/package.html        |   24 +
 .../dr/GridDrDataLoadCacheUpdater.java          |   92 -
 .../dr/IgniteDrDataStreamerCacheUpdater.java    |   92 +
 .../processors/igfs/IgfsDataManager.java        |   25 +-
 .../internal/processors/igfs/IgfsImpl.java      |    2 +-
 .../internal/processors/igfs/IgfsServer.java    |   49 +-
 .../processors/igfs/IgfsServerManager.java      |   35 +-
 .../processors/rest/GridRestCommand.java        |    8 +-
 .../processors/rest/GridRestProcessor.java      |    2 -
 .../processors/streamer/IgniteStreamerImpl.java |    4 +-
 .../ignite/internal/util/IgniteUtils.java       |    2 +-
 .../util/ipc/IpcServerEndpointDeserializer.java |   66 -
 .../visor/cache/VisorCacheCompactTask.java      |   66 -
 .../cache/VisorCachePreloadConfiguration.java   |   16 +-
 .../visor/node/VisorIgfsConfiguration.java      |    5 +-
 .../mxbean/ClusterLocalNodeMetricsMXBean.java   |    4 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   11 +-
 .../resources/META-INF/classnames.properties    |   34 +-
 .../resources/META-INF/licenses/apache-2.0.txt  |    0
 .../core/src/test/config/discovery-stress.xml   |    2 +-
 modules/core/src/test/config/example-cache.xml  |    4 +-
 modules/core/src/test/config/igfs-loopback.xml  |    6 +-
 modules/core/src/test/config/igfs-shmem.xml     |    8 +-
 .../config/load/dsi-49-server-production.xml    |    4 +-
 .../src/test/config/load/dsi-load-client.xml    |    4 +-
 .../src/test/config/load/dsi-load-server.xml    |    4 +-
 .../core/src/test/config/spring-cache-load.xml  |    2 +-
 .../core/src/test/config/spring-multicache.xml  |   16 +-
 .../test/config/websession/spring-cache-1.xml   |    6 +-
 .../test/config/websession/spring-cache-2.xml   |    6 +-
 .../test/config/websession/spring-cache-3.xml   |    6 +-
 .../ignite/internal/ClusterMetricsSelfTest.java |    2 +
 .../GridDiscoveryManagerAliveCacheSelfTest.java |    4 +-
 .../GridCacheAbstractFailoverSelfTest.java      |    4 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    |    8 +-
 ...acheAbstractUsersAffinityMapperSelfTest.java |    4 +-
 .../cache/GridCacheBasicStoreAbstractTest.java  |    4 +-
 .../GridCacheConcurrentTxMultiNodeTest.java     |    4 +-
 ...idCacheConfigurationConsistencySelfTest.java |    6 +-
 ...ridCacheConfigurationValidationSelfTest.java |    8 +-
 .../cache/GridCacheDeploymentSelfTest.java      |    4 +-
 ...idCacheGetAndTransformStoreAbstractTest.java |    4 +-
 .../cache/GridCacheIncrementTransformTest.java  |    4 +-
 .../cache/GridCacheMultiUpdateLockSelfTest.java |    4 +-
 .../GridCacheOrderedPreloadingSelfTest.java     |    6 +-
 .../cache/GridCacheP2PUndeploySelfTest.java     |    8 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |    4 +-
 ...hePartitionedProjectionAffinitySelfTest.java |    4 +-
 .../GridCachePreloadingEvictionsSelfTest.java   |    6 +-
 .../GridCacheQueryInternalKeysSelfTest.java     |    4 +-
 .../cache/GridCacheSwapPreloadSelfTest.java     |    4 +-
 .../GridCacheValueBytesPreloadingSelfTest.java  |    2 +-
 ...idCacheValueConsistencyAbstractSelfTest.java |    4 +-
 .../IgniteCacheAbstractStopBusySelfTest.java    |    2 +-
 .../cache/IgniteCacheTxPreloadNoWriteTest.java  |  111 ++
 ...tAllUpdateNonPreloadedPartitionSelfTest.java |    4 +-
 ...dCacheQueueMultiNodeConsistencySelfTest.java |    3 +-
 .../IgniteCollectionAbstractTest.java           |    4 +-
 ...GridCachePartitionedNodeRestartSelfTest.java |    4 +-
 ...idCachePartitionedNodeRestartTxSelfTest.java |    4 +-
 ...PartitionedQueueCreateMultiNodeSelfTest.java |    4 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |    6 +-
 .../GridCacheAbstractPrimarySyncSelfTest.java   |    4 +-
 ...acheEntrySetIterationPreloadingSelfTest.java |    2 +-
 ...heExpiredEntriesPreloadAbstractSelfTest.java |    6 +-
 .../distributed/GridCacheLockAbstractTest.java  |    4 +-
 ...dCacheMultithreadedFailoverAbstractTest.java |    4 +-
 ...dCachePartitionedAffinityFilterSelfTest.java |    4 +-
 .../GridCachePreloadEventsAbstractSelfTest.java |    8 +-
 .../GridCachePreloadLifecycleAbstractTest.java  |    6 +-
 ...GridCachePreloadRestartAbstractSelfTest.java |   10 +-
 ...iteTxConsistencyRestartAbstractSelfTest.java |    6 +-
 .../IgniteTxPreloadAbstractTest.java            |    6 +-
 .../dht/GridCacheAtomicNearCacheSelfTest.java   |    4 +-
 ...dCacheColocatedTxSingleThreadedSelfTest.java |    4 +-
 ...GridCacheDhtEvictionNearReadersSelfTest.java |    4 +-
 .../dht/GridCacheDhtEvictionSelfTest.java       |    4 +-
 .../dht/GridCacheDhtInternalEntrySelfTest.java  |    4 +-
 .../dht/GridCacheDhtMappingSelfTest.java        |    4 +-
 .../dht/GridCacheDhtPreloadBigDataSelfTest.java |   10 +-
 .../dht/GridCacheDhtPreloadDelayedSelfTest.java |   26 +-
 .../GridCacheDhtPreloadDisabledSelfTest.java    |    8 +-
 .../GridCacheDhtPreloadMessageCountTest.java    |    4 +-
 .../dht/GridCacheDhtPreloadPutGetSelfTest.java  |    6 +-
 .../dht/GridCacheDhtPreloadSelfTest.java        |   28 +-
 .../GridCacheDhtPreloadStartStopSelfTest.java   |   12 +-
 .../dht/GridCacheDhtPreloadUnloadSelfTest.java  |   10 +-
 ...ePartitionedNearDisabledMetricsSelfTest.java |    4 +-
 ...idCachePartitionedPreloadEventsSelfTest.java |    8 +-
 ...dCachePartitionedTopologyChangeSelfTest.java |    4 +-
 ...ridCachePartitionedUnloadEventsSelfTest.java |   12 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |    4 +-
 ...unctionExcludeNeighborsAbstractSelfTest.java |    4 +-
 ...GridCacheAtomicMultiNodeFullApiSelfTest.java |    4 +-
 ...idCacheAtomicPartitionedMetricsSelfTest.java |    4 +-
 .../near/GridCacheNearEvictionSelfTest.java     |    7 +-
 .../near/GridCacheNearMultiGetSelfTest.java     |    4 +-
 .../near/GridCacheNearOnlyTopologySelfTest.java |    4 +-
 .../GridCacheNearPartitionedClearSelfTest.java  |    4 +-
 .../near/GridCacheNearReadersSelfTest.java      |    4 +-
 .../near/GridCacheNearTxMultiNodeSelfTest.java  |    4 +-
 ...AffinityExcludeNeighborsPerformanceTest.java |    4 +-
 .../GridCachePartitionedAffinitySelfTest.java   |    4 +-
 .../GridCachePartitionedBasicOpSelfTest.java    |    2 +-
 .../near/GridCachePartitionedEventSelfTest.java |    4 +-
 ...idCachePartitionedHitsAndMissesSelfTest.java |   10 +-
 .../GridCachePartitionedMetricsSelfTest.java    |    4 +-
 ...achePartitionedMultiNodeCounterSelfTest.java |    4 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |    4 +-
 .../GridCachePartitionedNodeRestartTest.java    |    4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |    4 +-
 ...achePartitionedPreloadLifecycleSelfTest.java |    4 +-
 ...hePartitionedQueryMultiThreadedSelfTest.java |    2 +-
 .../GridCachePartitionedTxSalvageSelfTest.java  |    2 +-
 ...achePartitionedTxSingleThreadedSelfTest.java |    4 +-
 .../GridCacheReplicatedInvalidateSelfTest.java  |    4 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |    6 +-
 .../GridCacheSyncReplicatedPreloadSelfTest.java |    6 +-
 ...CacheReplicatedPreloadLifecycleSelfTest.java |    4 +-
 .../GridCacheReplicatedPreloadSelfTest.java     |   16 +-
 ...eplicatedPreloadStartStopEventsSelfTest.java |    6 +-
 .../GridCacheEvictionFilterSelfTest.java        |    4 +-
 .../GridCacheLruNearEvictionPolicySelfTest.java |    6 +-
 ...heNearOnlyLruNearEvictionPolicySelfTest.java |    6 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java |    4 +-
 ...dCacheAbstractReduceFieldsQuerySelfTest.java |    4 +-
 .../dataload/GridDataLoaderImplSelfTest.java    |  208 ---
 .../dataload/GridDataLoaderPerformanceTest.java |  197 ---
 .../GridDataLoaderProcessorSelfTest.java        |  971 -----------
 .../DataStreamProcessorSelfTest.java            |  970 +++++++++++
 .../datastreamer/DataStreamerImplSelfTest.java  |  205 +++
 .../IgniteDataStreamerPerformanceTest.java      |  197 +++
 .../processors/igfs/IgfsAbstractSelfTest.java   |   24 +-
 ...sCachePerBlockLruEvictionPolicySelfTest.java |   12 +-
 .../processors/igfs/IgfsMetricsSelfTest.java    |   12 +-
 .../processors/igfs/IgfsModesSelfTest.java      |    9 +-
 ...IpcEndpointRegistrationAbstractSelfTest.java |   21 +-
 ...dpointRegistrationOnLinuxAndMacSelfTest.java |    7 +-
 ...pcEndpointRegistrationOnWindowsSelfTest.java |    5 +-
 .../processors/igfs/IgfsSizeSelfTest.java       |    7 +-
 .../IpcServerEndpointDeserializerSelfTest.java  |  160 --
 .../ipc/shmem/IpcSharedMemoryNodeStartup.java   |    9 +-
 .../loadtests/GridCacheMultiNodeLoadTest.java   |    4 +-
 .../capacity/spring-capacity-cache.xml          |    4 +-
 .../loadtests/colocation/GridTestMain.java      |    4 +-
 .../loadtests/colocation/spring-colocation.xml  |    4 +-
 .../GridCachePartitionedAtomicLongLoadTest.java |    2 +-
 .../loadtests/discovery/GridGcTimeoutTest.java  |    2 +-
 .../mapper/GridContinuousMapperLoadTest1.java   |    2 +-
 .../mapper/GridContinuousMapperLoadTest2.java   |    2 +-
 .../GridP2PContinuousDeploymentSelfTest.java    |    4 +-
 .../tcp/GridCacheDhtLockBackupSelfTest.java     |    4 +-
 .../ignite/testframework/junits/IgniteMock.java |    4 +-
 .../junits/common/GridCommonAbstractTest.java   |    4 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   10 +-
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |    2 -
 .../webapp/META-INF/ignite-webapp-config.xml    |   12 +-
 modules/hadoop/config/core-site.ignite.xml      |   90 +
 modules/hadoop/config/hive-site.ignite.xml      |   37 +
 modules/hadoop/config/mapred-site.ignite.xml    |   66 +
 modules/hadoop/docs/hadoop_readme.md            |  135 ++
 modules/hadoop/docs/hadoop_readme.pdf           |  Bin 0 -> 82297 bytes
 .../internal/processors/hadoop/HadoopSetup.java |   33 +-
 .../hadoop/igfs/HadoopIgfsEndpoint.java         |    5 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |    4 +-
 ...Igfs20FileSystemLoopbackPrimarySelfTest.java |   14 +-
 ...oopIgfs20FileSystemShmemPrimarySelfTest.java |   14 +-
 .../igfs/HadoopIgfsDualAbstractSelfTest.java    |   25 +-
 ...oopSecondaryFileSystemConfigurationTest.java |   25 +-
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |   44 +-
 .../igfs/IgfsNearOnlyMultiNodeSelfTest.java     |   10 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |   16 +-
 .../IgniteHadoopFileSystemClientSelfTest.java   |   11 +-
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |   14 +-
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   10 +-
 ...niteHadoopFileSystemLoggerStateSelfTest.java |   12 +-
 ...adoopFileSystemLoopbackAbstractSelfTest.java |   12 +-
 ...teHadoopFileSystemSecondaryModeSelfTest.java |   23 +-
 ...teHadoopFileSystemShmemAbstractSelfTest.java |   12 +-
 .../hadoop/HadoopCommandLineTest.java           |   16 +-
 .../hadoop/HadoopPopularWordsTest.java          |    4 +-
 .../GridCacheAbstractFieldsQuerySelfTest.java   |    4 +-
 .../cache/GridCacheAbstractQuerySelfTest.java   |    4 +-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |    4 +-
 .../GridCacheCrossCacheQuerySelfTestNewApi.java |    4 +-
 .../cache/GridIndexingWithNoopSwapSelfTest.java |    4 +-
 .../near/GridCacheQueryNodeRestartSelfTest.java |    6 +-
 .../query/h2/sql/GridQueryParsingTest.java      |    5 +-
 .../tcp/GridOrderedMessageCancelSelfTest.java   |    4 +-
 .../scala/org/apache/ignite/scalar/scalar.scala |   16 +-
 modules/schema-import/pom.xml                   |  107 ++
 modules/schema-import/readme.txt                |  214 +++
 .../main/java/media/data_connection_48x48.png   |  Bin 0 -> 4443 bytes
 .../src/main/java/media/error_48x48.png         |  Bin 0 -> 4349 bytes
 .../src/main/java/media/ignite_128x128.png      |  Bin 0 -> 4917 bytes
 .../src/main/java/media/ignite_16x16.png        |  Bin 0 -> 608 bytes
 .../src/main/java/media/ignite_24x24.png        |  Bin 0 -> 930 bytes
 .../src/main/java/media/ignite_32x32.png        |  Bin 0 -> 1203 bytes
 .../src/main/java/media/ignite_48x48.png        |  Bin 0 -> 1868 bytes
 .../src/main/java/media/ignite_64x64.png        |  Bin 0 -> 2453 bytes
 .../src/main/java/media/information_48x48.png   |  Bin 0 -> 4102 bytes
 .../src/main/java/media/question_48x48.png      |  Bin 0 -> 3857 bytes
 .../src/main/java/media/sign_warning_48x48.png  |  Bin 0 -> 2988 bytes
 .../schema-import/src/main/java/media/style.css |  117 ++
 .../src/main/java/media/text_tree_48x48.png     |  Bin 0 -> 2567 bytes
 .../ignite/schema/generator/PojoGenerator.java  |  414 +++++
 .../schema/generator/SnippetGenerator.java      |  138 ++
 .../ignite/schema/generator/XmlGenerator.java   |  347 ++++
 .../ignite/schema/model/PojoDescriptor.java     |  510 ++++++
 .../apache/ignite/schema/model/PojoField.java   |  420 +++++
 .../schema/parser/DatabaseMetadataParser.java   |  108 ++
 .../apache/ignite/schema/parser/DbColumn.java   |   76 +
 .../apache/ignite/schema/parser/DbTable.java    |  105 ++
 .../parser/dialect/DB2MetadataDialect.java      |   30 +
 .../parser/dialect/DatabaseMetadataDialect.java |   78 +
 .../parser/dialect/JdbcMetadataDialect.java     |  141 ++
 .../parser/dialect/OracleMetadataDialect.java   |  281 +++
 .../ignite/schema/ui/ConfirmCallable.java       |   81 +
 .../org/apache/ignite/schema/ui/Controls.java   |  661 +++++++
 .../org/apache/ignite/schema/ui/GridPaneEx.java |  177 ++
 .../org/apache/ignite/schema/ui/MessageBox.java |  246 +++
 .../apache/ignite/schema/ui/ModalDialog.java    |   50 +
 .../ignite/schema/ui/SchemaImportApp.java       | 1615 ++++++++++++++++++
 .../ignite/schema/ui/TextColumnValidator.java   |   32 +
 .../schema/test/AbstractSchemaImportTest.java   |  134 ++
 .../test/generator/PojoGeneratorTest.java       |   70 +
 .../schema/test/generator/XmlGeneratorTest.java |   50 +
 .../apache/ignite/schema/test/model/Ignite.xml  |  390 +++++
 .../apache/ignite/schema/test/model/Objects.txt |  502 ++++++
 .../ignite/schema/test/model/ObjectsKey.txt     |   96 ++
 .../ignite/schema/test/model/Primitives.txt     |  506 ++++++
 .../ignite/schema/test/model/PrimitivesKey.txt  |   96 ++
 .../test/parser/DbMetadataParserTest.java       |  118 ++
 .../testsuites/IgniteSchemaImportTestSuite.java |   41 +
 modules/schema-load/pom.xml                     |  107 --
 .../main/java/media/data_connection_48x48.png   |  Bin 4443 -> 0 bytes
 .../src/main/java/media/error_48x48.png         |  Bin 4349 -> 0 bytes
 .../src/main/java/media/ignite_128x128.png      |  Bin 4917 -> 0 bytes
 .../src/main/java/media/ignite_16x16.png        |  Bin 608 -> 0 bytes
 .../src/main/java/media/ignite_24x24.png        |  Bin 930 -> 0 bytes
 .../src/main/java/media/ignite_32x32.png        |  Bin 1203 -> 0 bytes
 .../src/main/java/media/ignite_48x48.png        |  Bin 1868 -> 0 bytes
 .../src/main/java/media/ignite_64x64.png        |  Bin 2453 -> 0 bytes
 .../src/main/java/media/information_48x48.png   |  Bin 4102 -> 0 bytes
 .../src/main/java/media/question_48x48.png      |  Bin 3857 -> 0 bytes
 .../src/main/java/media/sign_warning_48x48.png  |  Bin 2988 -> 0 bytes
 .../schema-load/src/main/java/media/style.css   |  117 --
 .../src/main/java/media/text_tree_48x48.png     |  Bin 2567 -> 0 bytes
 .../ignite/schema/generator/PojoGenerator.java  |  414 -----
 .../schema/generator/SnippetGenerator.java      |  138 --
 .../ignite/schema/generator/XmlGenerator.java   |  347 ----
 .../ignite/schema/model/PojoDescriptor.java     |  510 ------
 .../apache/ignite/schema/model/PojoField.java   |  420 -----
 .../schema/parser/DatabaseMetadataParser.java   |  108 --
 .../apache/ignite/schema/parser/DbColumn.java   |   76 -
 .../apache/ignite/schema/parser/DbTable.java    |  105 --
 .../parser/dialect/DB2MetadataDialect.java      |   30 -
 .../parser/dialect/DatabaseMetadataDialect.java |   78 -
 .../parser/dialect/JdbcMetadataDialect.java     |  141 --
 .../parser/dialect/OracleMetadataDialect.java   |  281 ---
 .../ignite/schema/ui/ConfirmCallable.java       |   81 -
 .../org/apache/ignite/schema/ui/Controls.java   |  661 -------
 .../org/apache/ignite/schema/ui/GridPaneEx.java |  177 --
 .../org/apache/ignite/schema/ui/MessageBox.java |  246 ---
 .../apache/ignite/schema/ui/ModalDialog.java    |   50 -
 .../apache/ignite/schema/ui/SchemaLoadApp.java  | 1615 ------------------
 .../ignite/schema/ui/TextColumnValidator.java   |   32 -
 .../schema/load/AbstractSchemaLoaderTest.java   |  134 --
 .../load/generator/PojoGeneratorTest.java       |   70 -
 .../schema/load/generator/XmlGeneratorTest.java |   50 -
 .../apache/ignite/schema/load/model/Ignite.xml  |  390 -----
 .../apache/ignite/schema/load/model/Objects.txt |  502 ------
 .../ignite/schema/load/model/ObjectsKey.txt     |   96 --
 .../ignite/schema/load/model/Primitives.txt     |  506 ------
 .../ignite/schema/load/model/PrimitivesKey.txt  |   96 --
 .../load/parser/DbMetadataParserTest.java       |  118 --
 .../testsuites/IgniteSchemaLoadTestSuite.java   |   41 -
 .../org/apache/ignite/IgniteSpringBean.java     |    4 +-
 .../ignite/visor/commands/VisorConsole.scala    |    3 +-
 .../commands/cache/VisorCacheCommand.scala      |   30 +-
 .../cache/VisorCacheCompactCommand.scala        |  151 --
 .../commands/events/VisorEventsCommand.scala    |    4 +-
 .../commands/top/VisorTopologyCommand.scala     |    7 +-
 .../scala/org/apache/ignite/visor/visor.scala   |   12 +-
 .../cache/VisorCacheCompactCommandSpec.scala    |  103 --
 .../testsuites/VisorConsoleSelfTestSuite.scala  |    3 +-
 .../apache/ignite/visor/plugin/VisorPlugin.java |    8 +-
 .../cache/IgniteSqlQueryBenchmark.java          |    2 +-
 .../cache/IgniteSqlQueryJoinBenchmark.java      |    2 +-
 pom.xml                                         |   26 +-
 .../basic-concepts/async-support.md             |    0
 .../basic-concepts/getting-started.md           |    0
 .../basic-concepts/ignite-life-cycel.md         |    0
 .../documentation/basic-concepts/maven-setup.md |    0
 .../basic-concepts/what-is-ignite.md            |    0
 .../basic-concepts/zero-deployment.md           |    0
 wiki/documentation/clustering/aws-config.md     |    0
 wiki/documentation/clustering/cluster-config.md |    0
 wiki/documentation/clustering/cluster-groups.md |    0
 wiki/documentation/clustering/cluster.md        |    0
 .../documentation/clustering/leader-election.md |    0
 wiki/documentation/clustering/network-config.md |    0
 wiki/documentation/clustering/node-local-map.md |    0
 .../documentation/compute-grid/checkpointing.md |    0
 .../compute-grid/collocate-compute-and-data.md  |    0
 wiki/documentation/compute-grid/compute-grid.md |    0
 .../documentation/compute-grid/compute-tasks.md |    0
 .../compute-grid/distributed-closures.md        |    0
 .../compute-grid/executor-service.md            |    0
 .../compute-grid/fault-tolerance.md             |    0
 .../compute-grid/job-scheduling.md              |    0
 .../compute-grid/load-balancing.md              |    0
 .../data-grid/affinity-collocation.md           |    0
 .../data-grid/automatic-db-integration.md       |    0
 wiki/documentation/data-grid/cache-modes.md     |    0
 wiki/documentation/data-grid/cache-queries.md   |    0
 wiki/documentation/data-grid/data-grid.md       |    0
 wiki/documentation/data-grid/data-loading.md    |    0
 wiki/documentation/data-grid/evictions.md       |    0
 .../data-grid/hibernate-l2-cache.md             |    0
 wiki/documentation/data-grid/jcache.md          |    0
 wiki/documentation/data-grid/off-heap-memory.md |    0
 .../documentation/data-grid/persistent-store.md |    0
 wiki/documentation/data-grid/rebalancing.md     |    0
 wiki/documentation/data-grid/transactions.md    |    0
 .../data-grid/web-session-clustering.md         |    0
 .../distributed-data-structures/atomic-types.md |    0
 .../countdownlatch.md                           |    0
 .../distributed-data-structures/id-generator.md |    0
 .../queue-and-set.md                            |    0
 .../distributed-events/automatic-batching.md    |    0
 wiki/documentation/distributed-events/events.md |    0
 .../distributed-file-system/igfs.md             |    0
 .../distributed-messaging/messaging.md          |    0
 wiki/documentation/http/configuration.md        |    0
 wiki/documentation/http/rest-api.md             |    0
 .../release-notes/release-notes.md              |    0
 .../service-grid/cluster-singletons.md          |    0
 .../service-grid/service-configuration.md       |    0
 .../service-grid/service-example.md             |    0
 wiki/documentation/service-grid/service-grid.md |    0
 451 files changed, 16003 insertions(+), 15323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 0000000,85624d4..9eac46e
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@@ -1,0 -1,307 +1,308 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.datastreamer;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.thread.*;
+ import org.jetbrains.annotations.*;
+ 
++import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  *
+  */
+ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
+     /** Loaders map (access is not supposed to be highly concurrent). */
+     private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+ 
+     /** Busy lock. */
+     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ 
+     /** Flushing thread. */
+     private Thread flusher;
+ 
+     /** */
+     private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new 
DelayQueue<>();
+ 
+     /** Marshaller. */
+     private final Marshaller marsh;
+ 
+     /**
+      * @param ctx Kernal context.
+      */
+     public DataStreamProcessor(GridKernalContext ctx) {
+         super(ctx);
+ 
+         ctx.io().addMessageListener(TOPIC_DATASTREAM, new 
GridMessageListener() {
+             @Override public void onMessage(UUID nodeId, Object msg) {
+                 assert msg instanceof DataStreamerRequest;
+ 
+                 processRequest(nodeId, (DataStreamerRequest)msg);
+             }
+         });
+ 
+         marsh = ctx.config().getMarshaller();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void start() throws IgniteCheckedException {
+         if (ctx.config().isDaemon())
+             return;
+ 
+         flusher = new IgniteThread(new GridWorker(ctx.gridName(), 
"grid-data-loader-flusher", log) {
+             @Override protected void body() throws InterruptedException {
+                 while (!isCancelled()) {
+                     DataStreamerImpl<K, V> ldr = flushQ.take();
+ 
+                     if (!busyLock.enterBusy())
+                         return;
+ 
+                     try {
+                         if (ldr.isClosed())
+                             continue;
+ 
+                         ldr.tryFlush();
+ 
+                         flushQ.offer(ldr);
+                     }
+                     finally {
+                         busyLock.leaveBusy();
+                     }
+                 }
+             }
+         });
+ 
+         flusher.start();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Started data streamer processor.");
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onKernalStop(boolean cancel) {
+         if (ctx.config().isDaemon())
+             return;
+ 
+         ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+ 
+         busyLock.block();
+ 
+         U.interrupt(flusher);
+         U.join(flusher, log);
+ 
+         for (DataStreamerImpl<?, ?> ldr : ldrs) {
+             if (log.isDebugEnabled())
+                 log.debug("Closing active data streamer on grid stop [ldr=" + 
ldr + ", cancel=" + cancel + ']');
+ 
+             try {
+                 ldr.closeEx(cancel);
+             }
+             catch (IgniteInterruptedCheckedException e) {
+                 U.warn(log, "Interrupted while waiting for completion of the 
data streamer: " + ldr, e);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to close data streamer: " + ldr, e);
+             }
+         }
+ 
+         if (log.isDebugEnabled())
+             log.debug("Stopped data streamer processor.");
+     }
+ 
+     /**
+      * @param cacheName Cache name ({@code null} for default cache).
+      * @return Data loader.
+      */
+     public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
+         if (!busyLock.enterBusy())
+             throw new IllegalStateException("Failed to create data streamer 
(grid is stopping).");
+ 
+         try {
+             final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, 
cacheName, flushQ);
+ 
+             ldrs.add(ldr);
+ 
+             ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+                 @Override public void apply(IgniteInternalFuture<?> f) {
+                     boolean b = ldrs.remove(ldr);
+ 
+                     assert b : "Loader has not been added to set: " + ldr;
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Loader has been completed: " + ldr);
+                 }
+             });
+ 
+             return ldr;
+         }
+         finally {
+             busyLock.leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param nodeId Sender ID.
+      * @param req Request.
+      */
+     private void processRequest(UUID nodeId, DataStreamerRequest req) {
+         if (!busyLock.enterBusy()) {
+             if (log.isDebugEnabled())
+                 log.debug("Ignoring data load request (node is stopping): " + 
req);
+ 
+             return;
+         }
+ 
+         try {
+             if (log.isDebugEnabled())
+                 log.debug("Processing data load request: " + req);
+ 
+             Object topic;
+ 
+             try {
+                 topic = marsh.unmarshal(req.responseTopicBytes(), null);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to unmarshal topic from request: " + 
req, e);
+ 
+                 return;
+             }
+ 
+             ClassLoader clsLdr;
+ 
+             if (req.forceLocalDeployment())
+                 clsLdr = U.gridClassLoader();
+             else {
+                 GridDeployment dep = ctx.deploy().getGlobalDeployment(
+                     req.deploymentMode(),
+                     req.sampleClassName(),
+                     req.sampleClassName(),
+                     req.userVersion(),
+                     nodeId,
+                     req.classLoaderId(),
+                     req.participants(),
+                     null);
+ 
+                 if (dep == null) {
+                     sendResponse(nodeId,
+                         topic,
+                         req.requestId(),
+                         new IgniteCheckedException("Failed to get deployment 
for request [sndId=" + nodeId +
+                             ", req=" + req + ']'),
+                         false);
+ 
+                     return;
+                 }
+ 
+                 clsLdr = dep.classLoader();
+             }
+ 
+             IgniteDataStreamer.Updater<K, V> updater;
+ 
+             try {
+                 updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to unmarshal message [nodeId=" + nodeId 
+ ", req=" + req + ']', e);
+ 
+                 sendResponse(nodeId, topic, req.requestId(), e, false);
+ 
+                 return;
+             }
+ 
+             Collection<DataStreamerEntry> col = req.entries();
+ 
+             DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
+                 log,
+                 req.cacheName(),
+                 col,
+                 req.ignoreDeploymentOwnership(),
+                 req.skipStore(),
+                 updater);
+ 
+             Exception err = null;
+ 
+             try {
+                 job.call();
+             }
+             catch (Exception e) {
+                 U.error(log, "Failed to finish update job.", e);
+ 
+                 err = e;
+             }
+ 
+             sendResponse(nodeId, topic, req.requestId(), err, 
req.forceLocalDeployment());
+         }
+         finally {
+             busyLock.leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param resTopic Response topic.
+      * @param reqId Request ID.
+      * @param err Error.
+      * @param forceLocDep Force local deployment.
+      */
+     private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err,
+         boolean forceLocDep) {
 -        byte[] errBytes;
++        ByteBuffer errBytes;
+ 
+         try {
+             errBytes = err != null ? marsh.marshal(err) : null;
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to marshal message.", e);
+ 
+             return;
+         }
+ 
+         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);
+ 
+         try {
+             ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+         }
+         catch (IgniteCheckedException e) {
+             if (ctx.discovery().alive(nodeId))
+                 U.error(log, "Failed to respond to node [nodeId=" + nodeId + 
", res=" + res + ']', e);
+             else if (log.isDebugEnabled())
+                 log.debug("Node has left the grid: " + nodeId);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void printMemoryStats() {
+         X.println(">>>");
+         X.println(">>> Data streamer processor memory stats [grid=" + 
ctx.gridName() + ']');
+         X.println(">>>   ldrsSize: " + ldrs.size());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 0000000,dd8df35..432c6bf
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1,0 -1,1403 +1,1404 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.datastreamer;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.processors.cacheobject.*;
+ import org.apache.ignite.internal.processors.dr.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
++import java.nio.*;
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.events.EventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  * Data streamer implementation.
+  */
+ @SuppressWarnings("unchecked")
+ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
+     /** Isolated updater. */
+     private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+ 
+     /** Cache updater. */
+     private Updater<K, V> updater = ISOLATED_UPDATER;
+ 
+     /** */
 -    private byte[] updaterBytes;
++    private ByteBuffer updaterBytes;
+ 
+     /** Max remap count before issuing an error. */
+     private static final int DFLT_MAX_REMAP_CNT = 32;
+ 
+     /** Log reference. */
+     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+ 
+     /** Logger. */
+     private static IgniteLogger log;
+ 
+     /** Cache name ({@code null} for default cache). */
+     private final String cacheName;
+ 
+ 
+     /** Per-node buffer size. */
+     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+     private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+ 
+     /** */
+     private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+ 
+     /** */
+     private long autoFlushFreq;
+ 
+     /** Mapping. */
+     @GridToStringInclude
+     private ConcurrentMap<UUID, Buffer> bufMappings = new 
ConcurrentHashMap8<>();
+ 
+     /** Discovery listener. */
+     private final GridLocalEventListener discoLsnr;
+ 
+     /** Context. */
+     private final GridKernalContext ctx;
+ 
+     /** */
+     private final IgniteCacheObjectProcessor cacheObjProc;
+ 
+     /** */
+     private final CacheObjectContext cacheObjCtx;
+ 
+     /** Communication topic for responses. */
+     private final Object topic;
+ 
+     /** */
 -    private byte[] topicBytes;
++    private ByteBuffer topicBytes;
+ 
+     /** {@code True} if data loader has been cancelled. */
+     private volatile boolean cancelled;
+ 
+     /** Active futures of this data loader. */
+     @GridToStringInclude
+     private final Collection<IgniteInternalFuture<?>> activeFuts = new 
GridConcurrentHashSet<>();
+ 
+     /** Closure to remove from active futures. */
+     @GridToStringExclude
+     private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
+         @Override public void apply(IgniteInternalFuture<?> t) {
+             boolean rmv = activeFuts.remove(t);
+ 
+             assert rmv;
+         }
+     };
+ 
+     /** Job peer deploy aware. */
+     private volatile GridPeerDeployAware jobPda;
+ 
+     /** Deployment class. */
+     private Class<?> depCls;
+ 
+     /** Future to track loading finish. */
+     private final GridFutureAdapter<?> fut;
+ 
+     /** Public API future to track loading finish. */
+     private final IgniteFuture<?> publicFut;
+ 
+     /** Busy lock. */
+     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ 
+     /** Closed flag. */
+     private final AtomicBoolean closed = new AtomicBoolean();
+ 
+     /** */
+     private volatile long lastFlushTime = U.currentTimeMillis();
+ 
+     /** */
+     private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
+ 
+     /** */
+     private boolean skipStore;
+ 
+     /** */
+     private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+ 
+     /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} 
printed */
+     private static boolean isWarningPrinted;
+ 
+     /**
+      * @param ctx Grid kernal context.
+      * @param cacheName Cache name.
+      * @param flushQ Flush queue.
+      */
+     public DataStreamerImpl(
+         final GridKernalContext ctx,
+         @Nullable final String cacheName,
+         DelayQueue<DataStreamerImpl<K, V>> flushQ
+     ) {
+         assert ctx != null;
+ 
+         this.ctx = ctx;
+         this.cacheObjProc = ctx.cacheObjects();
+ 
+         if (log == null)
+             log = U.logger(ctx, logRef, DataStreamerImpl.class);
+ 
+         ClusterNode node = 
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+ 
+         if (node == null)
+             throw new IllegalStateException("Cache doesn't exist: " + 
cacheName);
+ 
+         this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, 
cacheName);
+         this.cacheName = cacheName;
+         this.flushQ = flushQ;
+ 
+         discoLsnr = new GridLocalEventListener() {
+             @Override public void onEvent(Event evt) {
+                 assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
+ 
+                 DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+ 
+                 UUID id = discoEvt.eventNode().id();
+ 
+                 // Remap regular mappings.
+                 final Buffer buf = bufMappings.remove(id);
+ 
+                 if (buf != null) {
+                     // Only async notification is possible since
+                     // discovery thread may be trapped otherwise.
+                     ctx.closure().callLocalSafe(
+                         new Callable<Object>() {
+                             @Override public Object call() throws Exception {
+                                 buf.onNodeLeft();
+ 
+                                 return null;
+                             }
+                         },
+                         true /* system pool */
+                     );
+                 }
+             }
+         };
+ 
+         ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT);
+ 
+         // Generate unique topic for this loader.
+         topic = 
TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+ 
+         ctx.io().addMessageListener(topic, new GridMessageListener() {
+             @Override public void onMessage(UUID nodeId, Object msg) {
+                 assert msg instanceof DataStreamerResponse;
+ 
+                 DataStreamerResponse res = (DataStreamerResponse)msg;
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Received data load response: " + res);
+ 
+                 Buffer buf = bufMappings.get(nodeId);
+ 
+                 if (buf != null)
+                     buf.onResponse(res);
+ 
+                 else if (log.isDebugEnabled())
+                     log.debug("Ignoring response since node has left 
[nodeId=" + nodeId + ", ");
+             }
+         });
+ 
+         if (log.isDebugEnabled())
+             log.debug("Added response listener within topic: " + topic);
+ 
+         fut = new DataStreamerFuture(this);
+ 
+         publicFut = new IgniteFutureImpl<>(fut);
+     }
+ 
+     /**
+      * @return Cache object context.
+      */
+     public CacheObjectContext cacheObjectContext() {
+         return cacheObjCtx;
+     }
+ 
+     /**
+      * Enters busy lock.
+      */
+     private void enterBusy() {
+         if (!busyLock.enterBusy())
+             throw new IllegalStateException("Data streamer has been closed.");
+     }
+ 
+     /**
+      * Leaves busy lock.
+      */
+     private void leaveBusy() {
+         busyLock.leaveBusy();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> future() {
+         return publicFut;
+     }
+ 
+     /**
+      * @return Internal future.
+      */
+     public IgniteInternalFuture<?> internalFuture() {
+         return fut;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void deployClass(Class<?> depCls) {
+         this.depCls = depCls;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void updater(Updater<K, V> updater) {
+         A.notNull(updater, "updater");
+ 
+         this.updater = updater;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean allowOverwrite() {
+         return updater != ISOLATED_UPDATER;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void allowOverwrite(boolean allow) {
+         if (allow == allowOverwrite())
+             return;
+ 
+         ClusterNode node = 
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+ 
+         if (node == null)
+             throw new IgniteException("Failed to get node for cache: " + 
cacheName);
+ 
+         updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : 
ISOLATED_UPDATER;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean skipStore() {
+         return skipStore;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void skipStore(boolean skipStore) {
+         this.skipStore = skipStore;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public String cacheName() {
+         return cacheName;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int perNodeBufferSize() {
+         return bufSize;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void perNodeBufferSize(int bufSize) {
+         A.ensure(bufSize > 0, "bufSize > 0");
+ 
+         this.bufSize = bufSize;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int perNodeParallelOperations() {
+         return parallelOps;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void perNodeParallelOperations(int parallelOps) {
+         this.parallelOps = parallelOps;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long autoFlushFrequency() {
+         return autoFlushFreq;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void autoFlushFrequency(long autoFlushFreq) {
+         A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+ 
+         long old = this.autoFlushFreq;
+ 
+         if (autoFlushFreq != old) {
+             this.autoFlushFreq = autoFlushFreq;
+ 
+             if (autoFlushFreq != 0 && old == 0)
+                 flushQ.add(this);
+             else if (autoFlushFreq == 0)
+                 flushQ.remove(this);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Map<K, V> entries) throws 
IllegalStateException {
+         A.notNull(entries, "entries");
+ 
+         return addData(entries.entrySet());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Collection<? extends 
Map.Entry<K, V>> entries) {
+         A.notEmpty(entries, "entries");
+ 
+         enterBusy();
+ 
+         try {
+             GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+ 
+             resFut.listen(rmvActiveFut);
+ 
+             activeFuts.add(resFut);
+ 
+             Collection<KeyCacheObject> keys = null;
+ 
+             if (entries.size() > 1) {
+                 keys = new GridConcurrentHashSet<>(entries.size(), 
U.capacity(entries.size()), 1);
+ 
+                 for (Map.Entry<K, V> entry : entries)
+                     keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, 
entry.getKey(), true));
+             }
+ 
+             Collection<? extends DataStreamerEntry> entries0 = 
F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
+                 @Override public DataStreamerEntry apply(Entry<K, V> e) {
+                     KeyCacheObject key = 
cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true);
+                     CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, 
e.getValue(), true);
+ 
+                     return new DataStreamerEntry(key, val);
+                 }
+             });
+ 
+             load0(entries0, resFut, keys, 0);
+ 
+             return new IgniteFutureImpl<>(resFut);
+         }
+         catch (IgniteException e) {
+             return new IgniteFinishedFutureImpl<>(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param key Key.
+      * @param val Value.
+      * @return Future.
+      */
+     public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject 
val) {
+         return addDataInternal(Collections.singleton(new 
DataStreamerEntry(key, val)));
+     }
+ 
+     /**
+      * @param key Key.
+      * @return Future.
+      */
+     public IgniteFuture<?> removeDataInternal(KeyCacheObject key) {
+         return addDataInternal(Collections.singleton(new 
DataStreamerEntry(key, null)));
+     }
+ 
+     /**
+      * @param entries Entries.
+      * @return Future.
+      */
+     public IgniteFuture<?> addDataInternal(Collection<? extends 
DataStreamerEntry> entries) {
+         enterBusy();
+ 
+         GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+ 
+         try {
+             resFut.listen(rmvActiveFut);
+ 
+             activeFuts.add(resFut);
+ 
+             Collection<KeyCacheObject> keys = null;
+ 
+             if (entries.size() > 1) {
+                 keys = new GridConcurrentHashSet<>(entries.size(), 
U.capacity(entries.size()), 1);
+ 
+                 for (DataStreamerEntry entry : entries)
+                     keys.add(entry.getKey());
+             }
+ 
+             load0(entries, resFut, keys, 0);
+ 
+             return new IgniteFutureImpl<>(resFut);
+         }
+         catch (Throwable e) {
+             resFut.onDone(e);
+ 
+             if (e instanceof Error)
+                 throw e;
+ 
+             return new IgniteFinishedFutureImpl<>(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+         A.notNull(entry, "entry");
+ 
+         return addData(F.asList(entry));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(K key, V val) {
+         A.notNull(key, "key");
+ 
+         KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, 
true);
+         CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
+ 
+         return addDataInternal(Collections.singleton(new 
DataStreamerEntry(key0, val0)));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeData(K key) {
+         return addData(key, null);
+     }
+ 
+     /**
+      * @param entries Entries.
+      * @param resFut Result future.
+      * @param activeKeys Active keys.
+      * @param remaps Remaps count.
+      */
+     private void load0(
+         Collection<? extends DataStreamerEntry> entries,
+         final GridFutureAdapter<Object> resFut,
+         @Nullable final Collection<KeyCacheObject> activeKeys,
+         final int remaps
+     ) {
+         assert entries != null;
+ 
+         if (!isWarningPrinted) {
+             synchronized (this) {
+                 if (!allowOverwrite() && !isWarningPrinted) {
+                     U.warn(log, "Data streamer will not overwrite existing 
cache entries for better performance " +
+                         "(to change, set allowOverwrite to true)");
+                 }
+ 
+                 isWarningPrinted = true;
+             }
+         }
+ 
+         Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new 
HashMap<>();
+ 
+         boolean initPda = ctx.deploy().enabled() && jobPda == null;
+ 
+         for (DataStreamerEntry entry : entries) {
+             List<ClusterNode> nodes;
+ 
+             try {
+                 KeyCacheObject key = entry.getKey();
+ 
+                 assert key != null;
+ 
+                 if (initPda) {
+                     jobPda = new DataStreamerPda(key.value(cacheObjCtx, 
false),
+                         entry.getValue() != null ? 
entry.getValue().value(cacheObjCtx, false) : null,
+                         updater);
+ 
+                     initPda = false;
+                 }
+ 
+                 nodes = nodes(key);
+             }
+             catch (IgniteCheckedException e) {
+                 resFut.onDone(e);
+ 
+                 return;
+             }
+ 
+             if (F.isEmpty(nodes)) {
+                 resFut.onDone(new ClusterTopologyException("Failed to map key 
to node " +
+                     "(no nodes with cache found in topology) [infos=" + 
entries.size() +
+                     ", cacheName=" + cacheName + ']'));
+ 
+                 return;
+             }
+ 
+             for (ClusterNode node : nodes) {
+                 Collection<DataStreamerEntry> col = mappings.get(node);
+ 
+                 if (col == null)
+                     mappings.put(node, col = new ArrayList<>());
+ 
+                 col.add(entry);
+             }
+         }
+ 
+         for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : 
mappings.entrySet()) {
+             final UUID nodeId = e.getKey().id();
+ 
+             Buffer buf = bufMappings.get(nodeId);
+ 
+             if (buf == null) {
+                 Buffer old = bufMappings.putIfAbsent(nodeId, buf = new 
Buffer(e.getKey()));
+ 
+                 if (old != null)
+                     buf = old;
+             }
+ 
+             final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+ 
+             IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
+                 @Override public void apply(IgniteInternalFuture<?> t) {
+                     try {
+                         t.get();
+ 
+                         if (activeKeys != null) {
+                             for (DataStreamerEntry e : entriesForNode)
+                                 activeKeys.remove(e.getKey());
+ 
+                             if (activeKeys.isEmpty())
+                                 resFut.onDone();
+                         }
+                         else {
+                             assert entriesForNode.size() == 1;
+ 
+                             // That has been a single key,
+                             // so complete result future right away.
+                             resFut.onDone();
+                         }
+                     }
+                     catch (IgniteCheckedException e1) {
+                         if (log.isDebugEnabled())
+                             log.debug("Future finished with error [nodeId=" + 
nodeId + ", err=" + e1 + ']');
+ 
+                         if (cancelled) {
+                             resFut.onDone(new IgniteCheckedException("Data 
streamer has been cancelled: " +
+                                 DataStreamerImpl.this, e1));
+                         }
+                         else if (remaps + 1 > maxRemapCnt) {
+                             resFut.onDone(new IgniteCheckedException("Failed 
to finish operation (too many remaps): "
+                                 + remaps), e1);
+                         }
+                         else
+                             load0(entriesForNode, resFut, activeKeys, remaps 
+ 1);
+                     }
+                 }
+             };
+ 
+             GridFutureAdapter<?> f;
+ 
+             try {
+                 f = buf.update(entriesForNode, lsnr);
+             }
+             catch (IgniteInterruptedCheckedException e1) {
+                 resFut.onDone(e1);
+ 
+                 return;
+             }
+ 
+             if (ctx.discovery().node(nodeId) == null) {
+                 if (bufMappings.remove(nodeId, buf))
+                     buf.onNodeLeft();
+ 
+                 if (f != null)
+                     f.onDone(new ClusterTopologyCheckedException("Failed to 
wait for request completion " +
+                         "(node has left): " + nodeId));
+             }
+         }
+     }
+ 
+     /**
+      * @param key Key to map.
+      * @return Nodes to send requests to.
+      * @throws IgniteCheckedException If failed.
+      */
+     private List<ClusterNode> nodes(KeyCacheObject key) throws 
IgniteCheckedException {
+         GridAffinityProcessor aff = ctx.affinity();
+ 
+         return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, 
key) :
+             Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+     }
+ 
+     /**
+      * Performs flush.
+      *
+      * @throws IgniteCheckedException If failed.
+      */
+     private void doFlush() throws IgniteCheckedException {
+         lastFlushTime = U.currentTimeMillis();
+ 
+         List<IgniteInternalFuture> activeFuts0 = null;
+ 
+         int doneCnt = 0;
+ 
+         for (IgniteInternalFuture<?> f : activeFuts) {
+             if (!f.isDone()) {
+                 if (activeFuts0 == null)
+                     activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 
1.2));
+ 
+                 activeFuts0.add(f);
+             }
+             else {
+                 f.get();
+ 
+                 doneCnt++;
+             }
+         }
+ 
+         if (activeFuts0 == null || activeFuts0.isEmpty())
+             return;
+ 
+         while (true) {
+             Queue<IgniteInternalFuture<?>> q = null;
+ 
+             for (Buffer buf : bufMappings.values()) {
+                 IgniteInternalFuture<?> flushFut = buf.flush();
+ 
+                 if (flushFut != null) {
+                     if (q == null)
+                         q = new ArrayDeque<>(bufMappings.size() * 2);
+ 
+                     q.add(flushFut);
+                 }
+             }
+ 
+             if (q != null) {
+                 assert !q.isEmpty();
+ 
+                 boolean err = false;
+ 
+                 for (IgniteInternalFuture fut = q.poll(); fut != null; fut = 
q.poll()) {
+                     try {
+                         fut.get();
+                     }
+                     catch (IgniteCheckedException e) {
+                         if (log.isDebugEnabled())
+                             log.debug("Failed to flush buffer: " + e);
+ 
+                         err = true;
+                     }
+                 }
+ 
+                 if (err)
+                     // Remaps needed - flush buffers.
+                     continue;
+             }
+ 
+             doneCnt = 0;
+ 
+             for (int i = 0; i < activeFuts0.size(); i++) {
+                 IgniteInternalFuture f = activeFuts0.get(i);
+ 
+                 if (f == null)
+                     doneCnt++;
+                 else if (f.isDone()) {
+                     f.get();
+ 
+                     doneCnt++;
+ 
+                     activeFuts0.set(i, null);
+                 }
+                 else
+                     break;
+             }
+ 
+             if (doneCnt == activeFuts0.size())
+                 return;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("ForLoopReplaceableByForEach")
+     @Override public void flush() throws IgniteException {
+         enterBusy();
+ 
+         try {
+             doFlush();
+         }
+         catch (IgniteCheckedException e) {
+             throw U.convertException(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * Flushes every internal buffer if buffer was flushed before passed in
+      * threshold.
+      * <p>
+      * Does not wait for result and does not fail on errors assuming that 
this method
+      * should be called periodically.
+      */
+     @Override public void tryFlush() throws IgniteInterruptedException {
+         if (!busyLock.enterBusy())
+             return;
+ 
+         try {
+             for (Buffer buf : bufMappings.values())
+                 buf.flush();
+ 
+             lastFlushTime = U.currentTimeMillis();
+         }
+         catch (IgniteInterruptedCheckedException e) {
+             throw U.convertException(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param cancel {@code True} to close with cancellation.
+      * @throws IgniteException If failed.
+      */
+     @Override public void close(boolean cancel) throws IgniteException {
+         try {
+             closeEx(cancel);
+         }
+         catch (IgniteCheckedException e) {
+             throw U.convertException(e);
+         }
+     }
+ 
+     /**
+      * @param cancel {@code True} to close with cancellation.
+      * @throws IgniteCheckedException If failed.
+      */
+     public void closeEx(boolean cancel) throws IgniteCheckedException {
+         if (!closed.compareAndSet(false, true))
+             return;
+ 
+         busyLock.block();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Closing data streamer [ldr=" + this + ", cancel=" + 
cancel + ']');
+ 
+         IgniteCheckedException e = null;
+ 
+         try {
+             // Assuming that no methods are called on this loader after this 
method is called.
+             if (cancel) {
+                 cancelled = true;
+ 
+                 for (Buffer buf : bufMappings.values())
+                     buf.cancelAll();
+             }
+             else
+                 doFlush();
+ 
+             ctx.event().removeLocalEventListener(discoLsnr);
+ 
+             ctx.io().removeMessageListener(topic);
+         }
+         catch (IgniteCheckedException e0) {
+             e = e0;
+         }
+ 
+         fut.onDone(null, e);
+ 
+         if (e != null)
+             throw e;
+     }
+ 
+     /**
+      * @return {@code true} If the loader is closed.
+      */
+     boolean isClosed() {
+         return fut.isDone();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void close() throws IgniteException {
+         close(false);
+     }
+ 
+     /**
+      * @return Max remap count.
+      */
+     public int maxRemapCount() {
+         return maxRemapCnt;
+     }
+ 
+     /**
+      * @param maxRemapCnt New max remap count.
+      */
+     public void maxRemapCount(int maxRemapCnt) {
+         this.maxRemapCnt = maxRemapCnt;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(DataStreamerImpl.class, this);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getDelay(TimeUnit unit) {
+         return unit.convert(nextFlushTime() - U.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+     }
+ 
+     /**
+      * @return Next flush time.
+      */
+     private long nextFlushTime() {
+         return lastFlushTime + autoFlushFreq;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int compareTo(Delayed o) {
+         return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 : 
-1;
+     }
+ 
+     /**
+      *
+      */
+     private class Buffer {
+         /** Node. */
+         private final ClusterNode node;
+ 
+         /** Active futures. */
+         private final Collection<IgniteInternalFuture<Object>> locFuts;
+ 
+         /** Buffered entries. */
+         private List<DataStreamerEntry> entries;
+ 
+         /** */
+         @GridToStringExclude
+         private GridFutureAdapter<Object> curFut;
+ 
+         /** Local node flag. */
+         private final boolean isLocNode;
+ 
+         /** ID generator. */
+         private final AtomicLong idGen = new AtomicLong();
+ 
+         /** Active futures. */
+         private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+ 
+         /** */
+         private final Semaphore sem;
+ 
+         /** Closure to signal on task finish. */
+         @GridToStringExclude
+         private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = 
new IgniteInClosure<IgniteInternalFuture<Object>>() {
+             @Override public void apply(IgniteInternalFuture<Object> t) {
+                 signalTaskFinished(t);
+             }
+         };
+ 
+         /**
+          * @param node Node.
+          */
+         Buffer(ClusterNode node) {
+             assert node != null;
+ 
+             this.node = node;
+ 
+             locFuts = new GridConcurrentHashSet<>();
+             reqs = new ConcurrentHashMap8<>();
+ 
+             // Cache local node flag.
+             isLocNode = node.equals(ctx.discovery().localNode());
+ 
+             entries = newEntries();
+             curFut = new GridFutureAdapter<>();
+             curFut.listen(signalC);
+ 
+             sem = new Semaphore(parallelOps);
+         }
+ 
+         /**
+          * @param newEntries Infos.
+          * @param lsnr Listener for the operation future.
+          * @throws IgniteInterruptedCheckedException If failed.
+          * @return Future for operation.
+          */
+         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> 
newEntries,
+             IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws 
IgniteInterruptedCheckedException {
+             List<DataStreamerEntry> entries0 = null;
+             GridFutureAdapter<Object> curFut0;
+ 
+             synchronized (this) {
+                 curFut0 = curFut;
+ 
+                 curFut0.listen(lsnr);
+ 
+                 for (DataStreamerEntry entry : newEntries)
+                     entries.add(entry);
+ 
+                 if (entries.size() >= bufSize) {
+                     entries0 = entries;
+ 
+                     entries = newEntries();
+                     curFut = new GridFutureAdapter<>();
+                     curFut.listen(signalC);
+                 }
+             }
+ 
+             if (entries0 != null) {
+                 submit(entries0, curFut0);
+ 
+                 if (cancelled)
+                     curFut0.onDone(new IgniteCheckedException("Data streamer 
has been cancelled: " + DataStreamerImpl.this));
+             }
+ 
+             return curFut0;
+         }
+ 
+         /**
+          * @return Fresh collection with some space for outgrowth.
+          */
+         private List<DataStreamerEntry> newEntries() {
+             return new ArrayList<>((int)(bufSize * 1.2));
+         }
+ 
+         /**
+          * @return Future if any submitted.
+          *
+          * @throws IgniteInterruptedCheckedException If thread has been 
interrupted.
+          */
+         @Nullable IgniteInternalFuture<?> flush() throws 
IgniteInterruptedCheckedException {
+             List<DataStreamerEntry> entries0 = null;
+             GridFutureAdapter<Object> curFut0 = null;
+ 
+             synchronized (this) {
+                 if (!entries.isEmpty()) {
+                     entries0 = entries;
+                     curFut0 = curFut;
+ 
+                     entries = newEntries();
+                     curFut = new GridFutureAdapter<>();
+                     curFut.listen(signalC);
+                 }
+             }
+ 
+             if (entries0 != null)
+                 submit(entries0, curFut0);
+ 
+             // Create compound future for this flush.
+             GridCompoundFuture<Object, Object> res = null;
+ 
+             for (IgniteInternalFuture<Object> f : locFuts) {
+                 if (res == null)
+                     res = new GridCompoundFuture<>();
+ 
+                 res.add(f);
+             }
+ 
+             for (IgniteInternalFuture<Object> f : reqs.values()) {
+                 if (res == null)
+                     res = new GridCompoundFuture<>();
+ 
+                 res.add(f);
+             }
+ 
+             if (res != null)
+                 res.markInitialized();
+ 
+             return res;
+         }
+ 
+         /**
+          * Increments active tasks count.
+          *
+          * @throws IgniteInterruptedCheckedException If thread has been 
interrupted.
+          */
+         private void incrementActiveTasks() throws 
IgniteInterruptedCheckedException {
+             U.acquire(sem);
+         }
+ 
+         /**
+          * @param f Future that finished.
+          */
+         private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+             assert f != null;
+ 
+             sem.release();
+         }
+ 
+         /**
+          * @param entries Entries to submit.
+          * @param curFut Current future.
+          * @throws IgniteInterruptedCheckedException If interrupted.
+          */
+         private void submit(final Collection<DataStreamerEntry> entries, 
final GridFutureAdapter<Object> curFut)
+             throws IgniteInterruptedCheckedException {
+             assert entries != null;
+             assert !entries.isEmpty();
+             assert curFut != null;
+ 
+             incrementActiveTasks();
+ 
+             IgniteInternalFuture<Object> fut;
+ 
+             if (isLocNode) {
+                 fut = ctx.closure().callLocalSafe(
+                     new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, updater), false);
+ 
+                 locFuts.add(fut);
+ 
+                 fut.listen(new 
IgniteInClosure<IgniteInternalFuture<Object>>() {
+                     @Override public void apply(IgniteInternalFuture<Object> 
t) {
+                         try {
+                             boolean rmv = locFuts.remove(t);
+ 
+                             assert rmv;
+ 
+                             curFut.onDone(t.get());
+                         }
+                         catch (IgniteCheckedException e) {
+                             curFut.onDone(e);
+                         }
+                     }
+                 });
+             }
+             else {
+                 try {
+                     for (DataStreamerEntry e : entries) {
+                         e.getKey().prepareMarshal(cacheObjCtx);
+ 
+                         CacheObject val = e.getValue();
+ 
+                         if (val != null)
+                             val.prepareMarshal(cacheObjCtx);
+                     }
+ 
+                     if (updaterBytes == null) {
+                         assert updater != null;
+ 
+                         updaterBytes = 
ctx.config().getMarshaller().marshal(updater);
+                     }
+ 
+                     if (topicBytes == null)
+                         topicBytes = 
ctx.config().getMarshaller().marshal(topic);
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to marshal (request will not be 
sent).", e);
+ 
+                     return;
+                 }
+ 
+                 GridDeployment dep = null;
+                 GridPeerDeployAware jobPda0 = null;
+ 
+                 if (ctx.deploy().enabled()) {
+                     try {
+                         jobPda0 = jobPda;
+ 
+                         assert jobPda0 != null;
+ 
+                         dep = ctx.deploy().deploy(jobPda0.deployClass(), 
jobPda0.classLoader());
+ 
+                         GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
+ 
+                         if (cache != null)
+                             cache.context().deploy().onEnter();
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Failed to deploy class (request will 
not be sent): " + jobPda0.deployClass(), e);
+ 
+                         return;
+                     }
+ 
+                     if (dep == null)
+                         U.warn(log, "Failed to deploy class (request will be 
sent): " + jobPda0.deployClass());
+                 }
+ 
+                 long reqId = idGen.incrementAndGet();
+ 
+                 fut = curFut;
+ 
+                 reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+ 
+                 DataStreamerRequest req = new DataStreamerRequest(
+                     reqId,
+                     topicBytes,
+                     cacheName,
+                     updaterBytes,
+                     entries,
+                     true,
+                     skipStore,
+                     dep != null ? dep.deployMode() : null,
+                     dep != null ? jobPda0.deployClass().getName() : null,
+                     dep != null ? dep.userVersion() : null,
+                     dep != null ? dep.participants() : null,
+                     dep != null ? dep.classLoaderId() : null,
+                     dep == null);
+ 
+                 try {
+                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Sent request to node [nodeId=" + node.id() 
+ ", req=" + req + ']');
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
+                         ((GridFutureAdapter<Object>)fut).onDone(e);
+                     else
+                         ((GridFutureAdapter<Object>)fut).onDone(new 
ClusterTopologyCheckedException("Failed to send " +
+                             "request (node has left): " + node.id()));
+                 }
+             }
+         }
+ 
+         /**
+          *
+          */
+         void onNodeLeft() {
+             assert !isLocNode;
+             assert bufMappings.get(node.id()) != this;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Forcibly completing futures (node has left): " + 
node.id());
+ 
+             Exception e = new ClusterTopologyCheckedException("Failed to wait 
for request completion " +
+                 "(node has left): " + node.id());
+ 
+             for (GridFutureAdapter<Object> f : reqs.values())
+                 f.onDone(e);
+ 
+             // Make sure to complete current future.
+             GridFutureAdapter<Object> curFut0;
+ 
+             synchronized (this) {
+                 curFut0 = curFut;
+             }
+ 
+             curFut0.onDone(e);
+         }
+ 
+         /**
+          * @param res Response.
+          */
+         void onResponse(DataStreamerResponse res) {
+             if (log.isDebugEnabled())
+                 log.debug("Received data load response: " + res);
+ 
+             GridFutureAdapter<?> f = reqs.remove(res.requestId());
+ 
+             if (f == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Future for request has not been found: " + 
res.requestId());
+ 
+                 return;
+             }
+ 
+             Throwable err = null;
+ 
 -            byte[] errBytes = res.errorBytes();
++            ByteBuffer errBytes = res.errorBytes();
+ 
+             if (errBytes != null) {
+                 try {
+                     GridPeerDeployAware jobPda0 = jobPda;
+ 
+                     err = ctx.config().getMarshaller().unmarshal(
+                         errBytes,
+                         jobPda0 != null ? jobPda0.classLoader() : 
U.gridClassLoader());
+                 }
+                 catch (IgniteCheckedException e) {
+                     f.onDone(null, new IgniteCheckedException("Failed to 
unmarshal response.", e));
+ 
+                     return;
+                 }
+             }
+ 
+             f.onDone(null, err);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Finished future [fut=" + f + ", reqId=" + 
res.requestId() + ", err=" + err + ']');
+         }
+ 
+         /**
+          *
+          */
+         void cancelAll() {
+             IgniteCheckedException err = new IgniteCheckedException("Data 
streamer has been cancelled: " + DataStreamerImpl.this);
+ 
+             for (IgniteInternalFuture<?> f : locFuts) {
+                 try {
+                     f.cancel();
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to cancel mini-future.", e);
+                 }
+             }
+ 
+             for (GridFutureAdapter<?> f : reqs.values())
+                 f.onDone(err);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             int size;
+ 
+             synchronized (this) {
+                 size = entries.size();
+             }
+ 
+             return S.toString(Buffer.class, this,
+                 "entriesCnt", size,
+                 "locFutsSize", locFuts.size(),
+                 "reqsSize", reqs.size());
+         }
+     }
+ 
+     /**
+      * Data streamer peer-deploy aware.
+      */
+     private class DataStreamerPda implements GridPeerDeployAware {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Deploy class. */
+         private Class<?> cls;
+ 
+         /** Class loader. */
+         private ClassLoader ldr;
+ 
+         /** Collection of objects to detect deploy class and class loader. */
+         private Collection<Object> objs;
+ 
+         /**
+          * Constructs data streamer peer-deploy aware.
+          *
+          * @param objs Collection of objects to detect deploy class and class 
loader.
+          */
+         private DataStreamerPda(Object... objs) {
+             this.objs = Arrays.asList(objs);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Class<?> deployClass() {
+             if (cls == null) {
+                 Class<?> cls0 = null;
+ 
+                 if (depCls != null)
+                     cls0 = depCls;
+                 else {
+                     for (Iterator<Object> it = objs.iterator(); (cls0 == null 
|| U.isJdk(cls0)) && it.hasNext();) {
+                         Object o = it.next();
+ 
+                         if (o != null)
+                             cls0 = U.detectClass(o);
+                     }
+ 
+                     if (cls0 == null || U.isJdk(cls0))
+                         cls0 = DataStreamerImpl.class;
+                 }
+ 
+                 assert cls0 != null : "Failed to detect deploy class [objs=" 
+ objs + ']';
+ 
+                 cls = cls0;
+             }
+ 
+             return cls;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public ClassLoader classLoader() {
+             if (ldr == null) {
+                 ClassLoader ldr0 = deployClass().getClassLoader();
+ 
+                 // Safety.
+                 if (ldr0 == null)
+                     ldr0 = U.gridClassLoader();
+ 
+                 assert ldr0 != null : "Failed to detect classloader [objs=" + 
objs + ']';
+ 
+                 ldr = ldr0;
+             }
+ 
+             return ldr;
+         }
+     }
+ 
+     /**
+      * Isolated updater which only loads entry initial value.
+      */
+     private static class IsolatedUpdater implements Updater<KeyCacheObject, 
CacheObject>,
+         DataStreamerCacheUpdaters.InternalUpdater {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** {@inheritDoc} */
+         @Override public void update(IgniteCache<KeyCacheObject, CacheObject> 
cache,
+             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
+ 
+             GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = 
proxy.context().cache();
+ 
+             if (internalCache.isNear())
+                 internalCache = internalCache.context().near().dht();
+ 
+             GridCacheContext cctx = internalCache.context();
+ 
+             long topVer = cctx.affinity().affinityTopologyVersion();
+ 
+             GridCacheVersion ver = cctx.versions().next(topVer);
+ 
+             for (Map.Entry<KeyCacheObject, CacheObject> e : entries) {
+                 try {
+                     e.getKey().finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
+ 
+                     GridCacheEntryEx entry = 
internalCache.entryEx(e.getKey(), topVer);
+ 
+                     entry.unswap(true, false);
+ 
+                     entry.initialValue(e.getValue(),
+                         ver,
+                         CU.TTL_ETERNAL,
+                         CU.EXPIRE_TIME_ETERNAL,
+                         false,
+                         topVer,
+                         GridDrType.DR_LOAD);
+ 
+                     cctx.evicts().touch(entry, topVer);
+                 }
+                 catch (GridDhtInvalidPartitionException | 
GridCacheEntryRemovedException ignored) {
+                     // No-op.
+                 }
+                 catch (IgniteCheckedException ex) {
+                     IgniteLogger log = cache.unwrap(Ignite.class).log();
+ 
+                     U.error(log, "Failed to set initial value for cache 
entry: " + e, ex);
+                 }
+             }
+         }
+     }
+ }

Reply via email to