Merge branch ignite-sprint-5 into ignite-80
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/89d98358 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/89d98358 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/89d98358 Branch: refs/heads/ignite-80 Commit: 89d983583f6b6287436b28992ebb83d5963e046e Parents: ebf8221 4b4158f Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed May 20 17:13:49 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed May 20 17:13:49 2015 -0700 ---------------------------------------------------------------------- DEVNOTES.txt | 34 +- LICENSE | 238 + LICENSE.txt | 238 - NOTICE | 12 + NOTICE.txt | 12 - assembly/release-base.xml | 10 +- assembly/release-schema-import.xml | 50 + bin/ignite-schema-import.bat | 2 +- bin/ignite-schema-import.sh | 2 +- bin/ignite.bat | 2 +- bin/ignite.sh | 2 +- bin/ignitevisorcmd.bat | 2 +- bin/ignitevisorcmd.sh | 2 +- bin/include/build-classpath.bat | 46 + bin/include/build-classpath.sh | 71 + bin/include/functions.sh | 2 +- bin/include/target-classpath.bat | 46 - bin/include/target-classpath.sh | 71 - dev-tools/.gitignore | 2 + dev-tools/build.gradle | 54 + dev-tools/gradle/wrapper/gradle-wrapper.jar | Bin 0 -> 51017 bytes .../gradle/wrapper/gradle-wrapper.properties | 6 + dev-tools/gradlew | 164 + dev-tools/src/main/groovy/jiraslurp.groovy | 312 + examples/pom.xml | 2 +- .../streaming/StreamTransformerExample.java | 4 +- .../streaming/StreamVisitorExample.java | 4 +- .../ignite/examples/streaming/package-info.java | 1 - .../streaming/wordcount/CacheConfig.java | 7 +- .../streaming/wordcount/QueryWords.java | 12 +- .../streaming/wordcount/StreamWords.java | 12 +- .../streaming/wordcount/package-info.java | 1 - .../socket/WordsSocketStreamerClient.java | 82 + .../socket/WordsSocketStreamerServer.java | 124 + .../wordcount/socket/package-info.java | 21 + modules/aop/pom.xml | 2 +- modules/aws/pom.xml | 2 +- .../config/grid-client-config.properties | 50 +- modules/clients/pom.xml | 2 +- .../ClientPropertiesConfigurationSelfTest.java | 12 +- modules/cloud/pom.xml | 4 +- .../TcpDiscoveryCloudIpFinderSelfTest.java | 2 - modules/codegen/pom.xml | 2 +- .../ignite/codegen/MessageCodeGenerator.java | 4 +- modules/core/pom.xml | 2 +- modules/core/src/main/java/META-INF/LICENSE | 238 + modules/core/src/main/java/META-INF/NOTICE | 12 + .../java/org/apache/ignite/IgniteCache.java | 5 + .../org/apache/ignite/IgniteJdbcDriver.java | 81 +- .../internal/GridEventConsumeHandler.java | 26 + .../ignite/internal/GridUpdateNotifier.java | 66 +- .../apache/ignite/internal/IgniteKernal.java | 109 +- .../org/apache/ignite/internal/IgnitionEx.java | 136 +- .../client/GridClientConfiguration.java | 2 +- .../internal/direct/DirectByteBufferStream.java | 4 +- .../interop/InteropAwareEventFilter.java | 37 + .../internal/interop/InteropBootstrap.java | 34 + .../interop/InteropBootstrapFactory.java | 39 + .../internal/interop/InteropIgnition.java | 166 + .../interop/InteropLocalEventListener.java | 28 + .../internal/interop/InteropProcessor.java | 36 + .../managers/communication/GridIoManager.java | 14 +- .../communication/GridIoMessageFactory.java | 4 +- .../GridLifecycleAwareMessageFilter.java | 5 +- .../eventstorage/GridEventStorageManager.java | 29 +- .../cache/DynamicCacheDescriptor.java | 16 +- .../processors/cache/GridCacheAdapter.java | 581 +- .../processors/cache/GridCacheContext.java | 7 + .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheIoManager.java | 230 +- .../processors/cache/GridCacheMapEntry.java | 123 +- .../processors/cache/GridCacheMessage.java | 8 +- .../processors/cache/GridCacheMvccManager.java | 4 +- .../GridCachePartitionExchangeManager.java | 3 + .../processors/cache/GridCacheProcessor.java | 159 +- .../processors/cache/GridCacheProxyImpl.java | 24 - .../processors/cache/GridCacheSwapManager.java | 215 +- .../processors/cache/GridCacheTtlManager.java | 156 +- .../processors/cache/GridCacheUtils.java | 6 +- .../processors/cache/IgniteInternalCache.java | 27 - ...ridCacheOptimisticCheckPreparedTxFuture.java | 434 -- ...idCacheOptimisticCheckPreparedTxRequest.java | 232 - ...dCacheOptimisticCheckPreparedTxResponse.java | 179 - .../distributed/GridCacheTxRecoveryFuture.java | 506 ++ .../distributed/GridCacheTxRecoveryRequest.java | 261 + .../GridCacheTxRecoveryResponse.java | 182 + .../GridDistributedCacheAdapter.java | 210 +- .../distributed/GridDistributedLockRequest.java | 111 +- .../GridDistributedTxFinishRequest.java | 70 +- .../distributed/GridDistributedTxMapping.java | 5 +- .../GridDistributedTxPrepareRequest.java | 112 +- .../GridDistributedTxRemoteAdapter.java | 22 +- .../distributed/dht/GridDhtCacheAdapter.java | 16 +- .../distributed/dht/GridDhtLocalPartition.java | 2 +- .../distributed/dht/GridDhtLockFuture.java | 2 - .../distributed/dht/GridDhtLockRequest.java | 45 +- .../dht/GridDhtOffHeapCacheEntry.java | 63 + .../dht/GridDhtTransactionalCacheAdapter.java | 15 +- .../distributed/dht/GridDhtTxFinishFuture.java | 3 - .../distributed/dht/GridDhtTxFinishRequest.java | 43 +- .../cache/distributed/dht/GridDhtTxLocal.java | 38 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 95 +- .../cache/distributed/dht/GridDhtTxMapping.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 100 +- .../dht/GridDhtTxPrepareRequest.java | 60 +- .../cache/distributed/dht/GridDhtTxRemote.java | 8 +- .../distributed/dht/GridNoStorageCacheMap.java | 4 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 3 + .../atomic/GridDhtAtomicOffHeapCacheEntry.java | 63 + .../dht/atomic/GridDhtAtomicUpdateResponse.java | 8 + .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../atomic/GridNearAtomicUpdateResponse.java | 18 +- .../dht/colocated/GridDhtColocatedCache.java | 5 +- .../colocated/GridDhtColocatedLockFuture.java | 33 +- .../GridDhtColocatedOffHeapCacheEntry.java | 63 + .../colocated/GridDhtDetachedCacheEntry.java | 4 +- .../dht/preloader/GridDhtForceKeysFuture.java | 6 + .../dht/preloader/GridDhtForceKeysResponse.java | 54 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 13 +- .../distributed/near/GridNearCacheEntry.java | 6 +- .../distributed/near/GridNearGetResponse.java | 8 +- .../distributed/near/GridNearLockFuture.java | 11 - .../distributed/near/GridNearLockRequest.java | 61 +- .../near/GridNearOffHeapCacheEntry.java | 60 + .../near/GridNearOptimisticTxPrepareFuture.java | 768 ++ .../GridNearPessimisticTxPrepareFuture.java | 347 + .../near/GridNearTransactionalCache.java | 4 - .../near/GridNearTxFinishRequest.java | 28 +- .../cache/distributed/near/GridNearTxLocal.java | 109 +- .../near/GridNearTxPrepareFuture.java | 1050 --- .../near/GridNearTxPrepareFutureAdapter.java | 231 + .../near/GridNearTxPrepareRequest.java | 52 +- .../near/GridNearTxPrepareResponse.java | 28 +- .../distributed/near/GridNearTxRemote.java | 24 +- .../processors/cache/local/GridLocalCache.java | 8 +- .../cache/local/GridLocalCacheEntry.java | 18 + .../local/atomic/GridLocalAtomicCache.java | 27 +- .../cache/query/GridCacheQueryManager.java | 21 +- .../cache/query/GridCacheSqlQuery.java | 2 +- .../cache/query/GridCacheTwoStepQuery.java | 17 + .../cache/transactions/IgniteInternalTx.java | 19 +- .../transactions/IgniteTransactionsImpl.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 76 +- .../cache/transactions/IgniteTxEntry.java | 48 +- .../cache/transactions/IgniteTxHandler.java | 106 +- .../transactions/IgniteTxLocalAdapter.java | 185 +- .../cache/transactions/IgniteTxLocalEx.java | 21 +- .../cache/transactions/IgniteTxManager.java | 247 +- .../datastreamer/DataStreamerImpl.java | 2 + .../processors/igfs/IgfsDataManager.java | 3 + .../processors/igfs/IgfsDeleteWorker.java | 4 + .../processors/igfs/IgfsMetaManager.java | 2 +- .../internal/processors/igfs/IgfsUtils.java | 11 +- .../offheap/GridOffHeapProcessor.java | 17 + .../processors/resource/GridResourceField.java | 16 +- .../processors/resource/GridResourceIoc.java | 389 +- .../processors/resource/GridResourceMethod.java | 13 + .../resource/GridResourceProcessor.java | 20 +- .../ignite/internal/util/IgniteUtils.java | 22 +- .../util/lang/GridComputeJobWrapper.java | 96 - .../util/lang/GridFilteredIterator.java | 2 +- .../ignite/internal/util/lang/GridFunc.java | 7218 +++++------------- .../internal/util/nio/GridBufferedParser.java | 4 - .../internal/util/nio/GridDelimitedParser.java | 91 + .../util/nio/GridNioDelimitedBuffer.java | 106 + .../util/offheap/GridOffHeapPartitionedMap.java | 9 + .../unsafe/GridUnsafePartitionedMap.java | 155 +- .../internal/visor/query/VisorQueryArg.java | 14 +- .../internal/visor/query/VisorQueryJob.java | 2 + .../apache/ignite/lang/IgniteAsyncSupport.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 4 - .../spi/discovery/tcp/TcpDiscoverySpi.java | 74 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 14 +- .../org/apache/ignite/stream/StreamAdapter.java | 111 + .../ignite/stream/StreamTupleExtractor.java | 33 + .../stream/socket/SocketMessageConverter.java | 31 + .../ignite/stream/socket/SocketStreamer.java | 218 + .../ignite/stream/socket/package-info.java | 21 + .../resources/META-INF/classnames.properties | 13 +- .../core/src/main/resources/ignite.properties | 2 +- .../internal/GridUpdateNotifierSelfTest.java | 30 +- .../processors/cache/CacheGetFromJobTest.java | 110 + .../cache/CacheOffheapMapEntrySelfTest.java | 168 + .../cache/CacheRemoveAllSelfTest.java | 81 + .../GridCacheAbstractFailoverSelfTest.java | 12 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 227 +- .../cache/GridCacheAbstractSelfTest.java | 4 +- .../cache/GridCacheSwapReloadSelfTest.java | 20 +- .../IgniteCacheEntryListenerAbstractTest.java | 4 +- .../cache/IgniteCacheNearLockValueSelfTest.java | 145 + .../IgniteCacheP2pUnmarshallingErrorTest.java | 189 + ...gniteCacheP2pUnmarshallingNearErrorTest.java | 56 + ...CacheP2pUnmarshallingRebalanceErrorTest.java | 80 + .../IgniteCacheP2pUnmarshallingTxErrorTest.java | 109 + .../cache/IgniteCachePeekModesAbstractTest.java | 15 +- .../cache/OffHeapTieredTransactionSelfTest.java | 127 + ...CacheLoadingConcurrentGridStartSelfTest.java | 163 + .../GridCacheAbstractNodeRestartSelfTest.java | 101 +- ...GridCacheLoadingConcurrentGridStartTest.java | 154 - .../distributed/GridCacheLockAbstractTest.java | 2 - .../distributed/IgniteTxGetAfterStopTest.java | 131 + ...xOriginatingNodeFailureAbstractSelfTest.java | 2 +- ...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 + ...achePartitionedNearDisabledLockSelfTest.java | 47 + ...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +- ...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 + ...earDisabledOffHeapTieredFullApiSelfTest.java | 33 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 + ...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 + ...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 + ...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 ++ ...idCacheAtomicReplicatedFailoverSelfTest.java | 6 + ...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 + ...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 + ...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + ...achePartitionedMultiNodeFullApiSelfTest.java | 15 +- .../GridCachePartitionedNodeRestartTest.java | 4 +- ...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +- ...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...PartitionedOffHeapTieredFullApiSelfTest.java | 32 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 + ...ePartitionedOptimisticTxNodeRestartTest.java | 4 +- .../GridCachePartitionedTxSalvageSelfTest.java | 25 +- .../near/IgniteCacheNearOnlyTxTest.java | 190 + .../near/NoneRebalanceModeSelfTest.java | 67 + .../GridCacheReplicatedFailoverSelfTest.java | 6 + .../GridCacheReplicatedLockSelfTest.java | 5 + .../GridCacheReplicatedNodeRestartSelfTest.java | 82 + ...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +- ...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +- ...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 + ...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 + .../IgniteCacheExpiryPolicyAbstractTest.java | 2 +- .../IgniteCacheExpiryPolicyTestSuite.java | 2 + .../expiry/IgniteCacheTtlCleanupSelfTest.java | 85 + ...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 + .../GridCacheLocalIsolatedNodesSelfTest.java | 18 +- .../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +- ...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 + .../igfs/IgfsClientCacheSelfTest.java | 132 + .../processors/igfs/IgfsOneClientNodeTest.java | 133 + .../processors/igfs/IgfsStreamsSelfTest.java | 2 +- .../util/nio/GridNioDelimitedBufferTest.java | 112 + ...idFileSwapSpaceSpiMultithreadedLoadTest.java | 4 +- .../tcp/TcpClientDiscoverySelfTest.java | 8 + .../discovery/tcp/TcpDiscoveryRestartTest.java | 199 + .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 65 +- .../stream/socket/SocketStreamerSelfTest.java | 315 + .../ignite/stream/socket/package-info.java | 21 + .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + .../IgniteCacheFailoverTestSuite.java | 10 +- .../IgniteCacheFullApiSelfTestSuite.java | 18 + ...gniteCacheP2pUnmarshallingErrorTestSuit.java | 41 + .../testsuites/IgniteCacheRestartTestSuite.java | 11 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 299 +- .../testsuites/IgniteCacheTestSuite2.java | 144 + .../testsuites/IgniteCacheTestSuite3.java | 142 + .../testsuites/IgniteCacheTestSuite4.java | 135 + .../IgniteCacheTxRecoverySelfTestSuite.java | 4 + .../ignite/testsuites/IgniteIgfsTestSuite.java | 3 + .../testsuites/IgniteStreamTestSuite.java | 39 + .../testsuites/IgniteUtilSelfTestSuite.java | 1 + modules/extdata/p2p/pom.xml | 2 +- modules/extdata/uri/pom.xml | 2 +- modules/gce/pom.xml | 4 +- modules/geospatial/pom.xml | 2 +- modules/hadoop/pom.xml | 2 +- modules/hibernate/pom.xml | 2 +- modules/indexing/pom.xml | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 4 + .../processors/query/h2/sql/GridSqlQuery.java | 20 + .../query/h2/sql/GridSqlQueryParser.java | 10 +- .../query/h2/sql/GridSqlQuerySplitter.java | 11 +- .../processors/query/h2/sql/GridSqlSelect.java | 2 +- .../processors/query/h2/sql/GridSqlUnion.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 3 + .../h2/twostep/GridReduceQueryExecutor.java | 119 +- .../cache/GridCacheOffHeapAndSwapSelfTest.java | 11 +- .../cache/GridCacheOffHeapSelfTest.java | 11 +- .../cache/GridCacheOffheapIndexGetSelfTest.java | 111 + .../IgniteCacheAbstractFieldsQuerySelfTest.java | 21 + ...niteCacheP2pUnmarshallingQueryErrorTest.java | 56 + ...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 + .../IgniteCacheQueryMultiThreadedSelfTest.java | 29 +- .../IgniteCacheQuerySelfTestSuite.java | 4 + .../IgniteCacheWithIndexingTestSuite.java | 2 + modules/jcl/pom.xml | 2 +- modules/jta/pom.xml | 2 +- .../processors/cache/jta/CacheJtaManager.java | 4 +- modules/log4j/pom.xml | 2 +- modules/rest-http/pom.xml | 2 +- modules/scalar/pom.xml | 2 +- .../ignite/scalar/ScalarConversions.scala | 8 - .../scalar/tests/ScalarCacheQueriesSpec.scala | 154 +- .../ignite/scalar/tests/ScalarCacheSpec.scala | 23 +- .../scalar/tests/ScalarConversionsSpec.scala | 43 +- .../scalar/tests/ScalarProjectionSpec.scala | 128 +- .../scalar/tests/ScalarReturnableSpec.scala | 41 +- modules/schedule/pom.xml | 2 +- modules/schema-import/pom.xml | 8 +- .../ignite/schema/generator/CodeGenerator.java | 47 +- modules/slf4j/pom.xml | 2 +- modules/spring/pom.xml | 2 +- modules/ssh/pom.xml | 2 +- modules/tools/pom.xml | 2 +- modules/urideploy/pom.xml | 2 +- modules/visor-console/pom.xml | 4 +- .../commands/cache/VisorCacheScanCommand.scala | 2 +- .../ignite/visor/VisorRuntimeBaseSpec.scala | 2 +- .../visor/commands/VisorArgListSpec.scala | 60 +- .../commands/VisorFileNameCompleterSpec.scala | 34 +- .../commands/ack/VisorAckCommandSpec.scala | 20 +- .../commands/alert/VisorAlertCommandSpec.scala | 68 +- .../cache/VisorCacheClearCommandSpec.scala | 48 +- .../commands/cache/VisorCacheCommandSpec.scala | 66 +- .../config/VisorConfigurationCommandSpec.scala | 8 +- .../cswap/VisorCacheSwapCommandSpec.scala | 24 +- .../deploy/VisorDeployCommandSpec.scala | 10 +- .../disco/VisorDiscoveryCommandSpec.scala | 46 +- .../events/VisorEventsCommandSpec.scala | 28 +- .../visor/commands/gc/VisorGcCommandSpec.scala | 30 +- .../commands/help/VisorHelpCommandSpec.scala | 57 +- .../commands/kill/VisorKillCommandSpec.scala | 58 +- .../commands/log/VisorLogCommandSpec.scala | 10 +- .../commands/mem/VisorMemoryCommandSpec.scala | 77 +- .../commands/node/VisorNodeCommandSpec.scala | 22 +- .../commands/open/VisorOpenCommandSpec.scala | 16 +- .../commands/ping/VisorPingCommandSpec.scala | 16 +- .../commands/start/VisorStartCommandSpec.scala | 126 +- .../commands/tasks/VisorTasksCommandSpec.scala | 112 +- .../commands/top/VisorTopologyCommandSpec.scala | 52 +- .../commands/vvm/VisorVvmCommandSpec.scala | 30 +- modules/visor-plugins/pom.xml | 2 +- modules/web/pom.xml | 2 +- modules/yardstick/pom.xml | 2 +- parent/pom.xml | 5 + pom.xml | 305 +- 343 files changed, 15525 insertions(+), 12077 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a3c0f6c,02f16c0..5f1f43e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@@ -24,7 -24,12 +24,11 @@@ import org.apache.ignite.internal.clust import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; + import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@@ -221,16 -228,67 +224,13 @@@ public class GridCacheIoManager extend unmarshall(nodeId, cacheMsg); - processMessage(nodeId, cacheMsg, c); + if (cacheMsg.classError() != null) + processFailedMessage(nodeId, cacheMsg); - else { - if (cacheMsg.allowForStartup()) - processMessage(nodeId, cacheMsg, c); - else { - IgniteInternalFuture<?> startFut = startFuture(cacheMsg); - - if (startFut.isDone()) - processMessage(nodeId, cacheMsg, c); - else { - if (log.isDebugEnabled()) - log.debug("Waiting for start future to complete for message [nodeId=" + nodeId + - ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); - - // Don't hold this thread waiting for preloading to complete. - startFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(final IgniteInternalFuture<?> f) { - cctx.kernalContext().closure().runLocalSafe( - new GridPlainRunnable() { - @Override public void run() { - rw.readLock(); - - try { - if (stopping) { - if (log.isDebugEnabled()) - log.debug("Received cache communication message while stopping " + - "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); - - return; - } - - f.get(); - - if (log.isDebugEnabled()) - log.debug("Start future completed for message [nodeId=" + nodeId + - ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); - - processMessage(nodeId, cacheMsg, c); - } - catch (IgniteCheckedException e) { - // Log once. - if (startErr.compareAndSet(false, true)) - U.error(log, "Failed to complete preload start future " + - "(will ignore message) " + - "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); - } - finally { - rw.readUnlock(); - } - } - } - ); - } - }); - } - } - } ++ else ++ processMessage(nodeId, cacheMsg, c); } catch (Throwable e) { - if (X.hasCause(e, ClassNotFoundException.class)) - U.error(log, "Failed to process message (note that distributed services " + - "do not support peer class loading, if you deploy distributed service " + - "you should have all required classes in CLASSPATH on all nodes in topology) " + - "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']'); - else - U.error(log, "Failed to process message [senderId=" + nodeId + ", msg=" + cacheMsg + ']', e); + U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e); if (e instanceof Error) throw (Error)e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 43adfbb,0e1a9c2..86cb7a7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -655,100 -659,79 +659,90 @@@ public class GridCacheProcessor extend } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; ++ Collection<GridCacheAdapter> startCaches = new ArrayList<>(); + - ClusterNode locNode = ctx.discovery().localNode(); + try { + if (ctx.config().isDaemon()) + return; + + ClusterNode locNode = ctx.discovery().localNode(); + Collection<DynamicCacheDescriptor> initCaches = new ArrayList<>(F.view(registeredCaches.values(), + new IgnitePredicate<DynamicCacheDescriptor>() { + @Override public boolean apply(DynamicCacheDescriptor desc) { + return desc.locallyConfigured() || desc.receivedOnStart(); + } + })); + - // Init cache plugin managers. - final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); - - for (DynamicCacheDescriptor desc : initCaches) { - CacheConfiguration locCcfg = desc.cacheConfiguration(); - - CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); - - cache2PluginMgr.put(locCcfg.getName(), pluginMgr); - } - - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - checkTransactionConfiguration(n); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : initCaches) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { ++ for (DynamicCacheDescriptor desc : initCaches) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + // Check plugin cache configurations. + CachePluginManager pluginMgr = desc.pluginManager(); - assert pluginMgr != null : " Map=" + cache2PluginMgr; - - pluginMgr.validateRemotes(rmtCfg, n); + pluginMgr.validateRemotes(rmtCfg, n); + } } } } - } - - Collection<GridCacheAdapter> startCaches = new ArrayList<>(initCaches.size()); - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - boolean started = desc.onStart(); + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : initCaches) { + boolean started = desc.onStart(); - assert started : "Failed to change started flag for locally configured cache: " + desc; + assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); + desc.clearRemoteConfigurations(); - CacheConfiguration ccfg = desc.cacheConfiguration(); + CacheConfiguration ccfg = desc.cacheConfiguration(); - IgnitePredicate filter = ccfg.getNodeFilter(); + IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + if (filter.apply(locNode)) { + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); - - assert pluginMgr != null : "Map=" + cache2PluginMgr + ", "; - CachePluginManager pluginMgr = desc.pluginManager(); ++ CachePluginManager pluginMgr = desc.pluginManager(); - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); - ctx.dynamicDeploymentId(desc.deploymentId()); + ctx.dynamicDeploymentId(desc.deploymentId()); - sharedCtx.addCacheContext(ctx); + sharedCtx.addCacheContext(ctx); - GridCacheAdapter cache = ctx.cache(); + GridCacheAdapter cache = ctx.cache(); - String name = ccfg.getName(); + String name = ccfg.getName(); - caches.put(maskNull(name), cache); + caches.put(maskNull(name), cache); - startCache(cache); + startCache(cache); - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); - } + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + + startCaches.add(cache); } } ++ } + finally { + cacheStartedLatch.countDown(); + } ctx.marshallerContext().onMarshallerCacheStarted(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index c063657,841cac8..d523d6a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@@ -408,11 -398,10 +398,11 @@@ public class GridDhtTxLocal extends Gri } else { assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + - "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; + "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + + ", tx=" + this + ']'; // Prepare was called explicitly. - return fut; + return chainOnePhasePrepare(fut); } if (state() != PREPARING) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index 4df1b9e,ba2c35f..38175fa --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@@ -28,10 -28,7 +28,10 @@@ import java.util.* /** * DHT transaction mapping. */ - public class GridDhtTxMapping<K, V> { + public class GridDhtTxMapping { + /** */ + private UUID locNodeId; + /** Transaction nodes mapping (primary node -> related backup nodes). */ private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6fae10b,293cf95..72262bc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -686,10 -676,14 +680,11 @@@ public final class GridDhtTxPrepareFutu /** * Completeness callback. * + * @param res Response. * @return {@code True} if {@code done} flag was changed as a result of this call. */ - private boolean onComplete() { - if (super.onDone(tx, err.get())) { + private boolean onComplete(@Nullable GridNearTxPrepareResponse res) { - if (last || tx.isSystemInvalidate()) - tx.state(PREPARED); - + if (super.onDone(res, err.get())) { // Don't forget to clean up. cctx.mvcc().removeFuture(this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 0000000,4f74303..9588e7c mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@@ -1,0 -1,768 +1,768 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; + import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.internal.processors.cache.distributed.dht.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.transactions.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.future.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.transactions.*; + import org.jetbrains.annotations.*; + import org.jsr166.*; + + import java.util.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; + import static org.apache.ignite.transactions.TransactionState.*; + + /** + * + */ + public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter + implements GridCacheMvccFuture<IgniteInternalTx> { + /** */ + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + lockKeys.remove(entry.txKey()); + + // This will check for locks. + onDone(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture<?> fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture) fut; + + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + + found = true; + } + } + } + + return found; + } + + /** + * @param nodeId Failed node ID. + * @param mappings Remaining mappings. + * @param e Error. + */ + void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { + if (err.compareAndSet(null, e)) { + boolean marked = tx.setRollbackOnly(); + + if (e instanceof IgniteTxOptimisticCheckedException) { + assert nodeId != null : "Missing node ID for optimistic failure exception: " + e; + + tx.removeKeysMapping(nodeId, mappings); + } + + if (e instanceof IgniteTxRollbackCheckedException) { + if (marked) { + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to automatically rollback transaction: " + tx, ex); + } + } + } + + onComplete(); + } + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { - boolean locked = lockKeys.isEmpty(); ++ boolean locked = lockKeys.isEmpty() && pending().isEmpty(); + + if (locked) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(nodeId, res); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteInternalTx t, Throwable err) { + // If locks were not acquired yet, delay completion. + if (isDone() || (err == null && !checkLocks())) + return false; + + this.err.compareAndSet(null, err); + + if (err == null) + tx.state(PREPARED); + + if (super.onDone(tx, err)) { + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + */ + private void onComplete() { + if (super.onDone(tx, err.get())) + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + } + + /** {@inheritDoc} */ + @Override public void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer != null) { + tx.topologyVersion(topVer); + + prepare0(); + + return; + } + + prepareOnTopology(); + } + + /** + * + */ + private void prepareOnTopology() { + GridDhtTopologyFuture topFut = topologyReadLock(); + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + StringBuilder invalidCaches = new StringBuilder(); + + boolean cacheInvalid = false; + + for (GridCacheContext ctx : cctx.cacheContexts()) { + if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) { + if (cacheInvalid) + invalidCaches.append(", "); + + invalidCaches.append(U.maskName(ctx.name())); + + cacheInvalid = true; + } + } + + if (cacheInvalid) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + tx.topologyVersion(topFut.topologyVersion()); + + prepare0(); + } + else { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + prepareOnTopology(); + } + }); + } + }); + } + } + finally { + topologyReadUnlock(); + } + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + private GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + private void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * Initializes future. + */ + private void prepare0() { + try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); + + prepare( + tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), + tx.writeEntries()); + + markInitialized(); + } + catch (TransactionTimeoutException | TransactionOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param reads Read entries. + * @param writes Write entries. + * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node. + */ + private void prepare( + Iterable<IgniteTxEntry> reads, + Iterable<IgniteTxEntry> writes + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + - txMapping = new GridDhtTxMapping(); ++ txMapping = new GridDhtTxMapping(cctx.localNodeId()); + + ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>(); + + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + + // Assign keys to primary nodes. + GridDistributedTxMapping cur = null; + + for (IgniteTxEntry read : reads) { + GridDistributedTxMapping updated = map(read, topVer, cur, false); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (read.context().isNear()) + tx.nearLocallyMapped(true); + else if (read.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + for (IgniteTxEntry write : writes) { + GridDistributedTxMapping updated = map(write, topVer, cur, true); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (write.context().isNear()) + tx.nearLocallyMapped(true); + else if (write.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addEntryMapping(mappings); + + cctx.mvcc().recheckPendingLocks(); + + txMapping.initLast(mappings); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + proceedPrepare(mappings); + } + + /** + * Continues prepare after previous mapping successfully finished. + * + * @param mappings Queue of mappings. + */ + private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) { + if (isDone()) + return; + + final GridDistributedTxMapping m = mappings.poll(); + + if (m == null) + return; + + assert !m.empty(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + tx.optimistic() && tx.serializable() ? m.reads() : null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(null, null, e); + } + } + + final MiniFuture fut = new MiniFuture(m, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(n.id(), prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + fut.onResult(e); + } + } + } + + /** + * @param entry Transaction entry. + * @param topVer Topology version. + * @param cur Current mapping. + * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. + * @return Mapping. + */ + private GridDistributedTxMapping map( + IgniteTxEntry entry, + AffinityTopologyVersion topVer, + GridDistributedTxMapping cur, + boolean waitLock + ) throws IgniteCheckedException { + GridCacheContext cacheCtx = entry.context(); + + List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + - txMapping.addMapping(nodes); - + ClusterNode primary = F.first(nodes); + ++ txMapping.addMapping(nodes, cacheCtx.isNear()); ++ + assert primary != null; + + if (log.isDebugEnabled()) { + log.debug("Mapped key to primary node [key=" + entry.key() + + ", part=" + cacheCtx.affinity().partition(entry.key()) + + ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); + } + + // Must re-initialize cached entry while holding topology lock. + if (cacheCtx.isNear()) + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); + else if (!cacheCtx.isLocal()) + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); + else + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); + + if (cacheCtx.isNear() || cacheCtx.isLocal()) { + if (waitLock && entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); + } + + if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + cur = new GridDistributedTxMapping(primary); + + // Initialize near flag right away. + cur.near(cacheCtx.isNear()); + } + + cur.add(entry); + + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + + entry.nodeId(primary.id()); + + if (cacheCtx.isNear()) { + while (true) { + try { + GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); + + cached.dhtNodeId(tx.xidVersion(), primary.id()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry.cached(cacheCtx.near().entryEx(entry.key())); + } + } + } + + return cur; + } + + /** {@inheritDoc} */ + @Override public String toString() { + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }); + + return S.toString(GridNearOptimisticTxPrepareFuture.class, this, + "futs", futs, + "super", super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Flag to signal some result being processed. */ + private AtomicBoolean rcvRes = new AtomicBoolean(false); + + /** Mappings to proceed prepare. */ + private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings; + + /** + * @param m Mapping. + * @param mappings Queue of mappings to proceed with. + */ + MiniFuture( + GridDistributedTxMapping m, + ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings + ) { + this.m = m; + this.mappings = mappings; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @return Keys. + */ + public GridDistributedTxMapping mapping() { + return m; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + else + U.warn(log, "Received error after another result has been processed [fut=" + + GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyCheckedException e) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); + + // Fail the whole future (make sure not to remap on different primary node + // to prevent multiple lock coordinators). + onError(null, null, e); + } + } + + /** + * @param nodeId Failed node ID. + * @param res Result callback. + */ + void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (res.error() != null) { + // Fail the whole compound future. + onError(nodeId, mappings, res.error()); + } + else { + onPrepareResponse(m, res); + + // Proceed prepare before finishing mini future. + if (mappings != null) + proceedPrepare(mappings); + + // Finish this mini future. + onDone(tx); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index da3911b,c38965d..9e8d386 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@@ -563,36 -547,34 +547,34 @@@ public class GridNearTxLocal extends Gr Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = groupLock() ? - Collections.singletonList(groupLockEntry()) : - F.concat(false, mapping.reads(), mapping.writes()); + Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes()); for (IgniteTxEntry txEntry : entries) { - while (true) { - GridCacheContext cacheCtx = txEntry.cached().context(); + GridCacheContext cacheCtx = txEntry.cached().context(); - assert cacheCtx.isNear(); + if (cacheCtx.isNear()) { + while (true) { + GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); - GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); - - try { - // Handle explicit locks. - GridCacheVersion explicit = txEntry.explicitVersion(); + try { + // Handle explicit locks. + GridCacheVersion explicit = txEntry.explicitVersion(); - if (explicit == null) - entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); + if (explicit == null) + entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null; + break; + } + catch (GridCacheEntryRemovedException ignored) { + assert entry.obsoleteVersion() != null; - if (log.isDebugEnabled()) - log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + - ", tx=" + this + ']'); + if (log.isDebugEnabled()) + log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + + ", tx=" + this + ']'); - // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); + // Replace the entry. + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89d98358/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ----------------------------------------------------------------------