IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1552a4b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1552a4b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1552a4b2 Branch: refs/heads/ignite-883-1 Commit: 1552a4b2474bc92ed3aa654c2a477cf444d88e0a Parents: fa97def 0fa2853 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Jun 8 15:27:35 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Jun 8 15:27:35 2015 -0700 ---------------------------------------------------------------------- DEVNOTES.txt | 42 +- .../java/org/apache/ignite/IgniteCache.java | 16 + .../apache/ignite/internal/IgniteKernal.java | 28 +- .../ignite/internal/IgniteNodeAttributes.java | 5 +- .../org/apache/ignite/internal/IgnitionEx.java | 8 +- .../internal/MarshallerContextAdapter.java | 36 +- .../internal/managers/GridManagerAdapter.java | 9 + .../checkpoint/GridCheckpointManager.java | 52 +- .../discovery/GridDiscoveryManager.java | 28 +- .../affinity/GridAffinityProcessor.java | 23 +- .../cache/DynamicCacheDescriptor.java | 17 + .../processors/cache/GridCacheAdapter.java | 21 +- .../processors/cache/GridCacheContext.java | 13 + .../GridCachePartitionExchangeManager.java | 26 +- .../processors/cache/GridCacheProcessor.java | 37 +- .../processors/cache/GridCacheProxyImpl.java | 14 +- .../processors/cache/GridCacheTtlManager.java | 9 +- .../processors/cache/IgniteCacheProxy.java | 23 + .../processors/cache/IgniteInternalCache.java | 11 +- .../dht/atomic/GridDhtAtomicCache.java | 22 +- .../dht/preloader/GridDhtForceKeysFuture.java | 40 +- .../GridDhtPartitionsExchangeFuture.java | 50 +- .../transactions/IgniteTxLocalAdapter.java | 28 + .../cache/transactions/IgniteTxManager.java | 3 - .../datastreamer/DataStreamerImpl.java | 92 ++- .../datastructures/DataStructuresProcessor.java | 107 +++- .../processors/igfs/IgfsMetaManager.java | 2 +- .../service/GridServiceProcessor.java | 4 +- .../timeout/GridSpiTimeoutObject.java | 73 +++ .../timeout/GridTimeoutProcessor.java | 105 +++- .../IgniteTxRollbackCheckedException.java | 9 + .../util/nio/GridCommunicationClient.java | 30 +- .../util/nio/GridNioRecoveryDescriptor.java | 13 +- .../util/nio/GridTcpCommunicationClient.java | 554 ------------------- .../util/nio/GridTcpNioCommunicationClient.java | 8 - .../ignite/internal/visor/cache/VisorCache.java | 2 +- .../VisorCacheConfigurationCollectorJob.java | 6 +- .../internal/visor/cache/VisorCacheMetrics.java | 19 +- .../cache/VisorCacheMetricsCollectorTask.java | 10 +- .../cache/VisorCacheStoreConfiguration.java | 5 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 27 +- .../org/apache/ignite/spi/IgniteSpiContext.java | 10 + .../ignite/spi/IgniteSpiTimeoutObject.java | 44 ++ .../spi/checkpoint/noop/NoopCheckpointSpi.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 443 ++++----------- .../tcp/TcpCommunicationSpiMBean.java | 2 - .../ignite/spi/discovery/tcp/ClientImpl.java | 3 - .../ignite/spi/discovery/tcp/ServerImpl.java | 10 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 156 +----- ...acheReadOnlyTransactionalClientSelfTest.java | 327 +++++++++++ .../cache/GridCacheAbstractFullApiSelfTest.java | 83 +++ .../GridCacheExAbstractFullApiSelfTest.java | 103 ---- .../IgniteCountDownLatchAbstractSelfTest.java | 102 ++++ .../GridCacheExColocatedFullApiSelfTest.java | 33 -- .../near/GridCacheExNearFullApiSelfTest.java | 39 -- .../GridCacheExReplicatedFullApiSelfTest.java | 33 -- .../IgniteCacheClientNearCacheExpiryTest.java | 103 ++++ .../IgniteCacheExpiryPolicyTestSuite.java | 2 + .../local/GridCacheExLocalFullApiSelfTest.java | 30 - .../DataStreamerMultiThreadedSelfTest.java | 59 +- .../DataStreamerMultinodeCreateCacheTest.java | 97 ++++ .../internal/util/nio/GridNioSelfTest.java | 2 +- .../loadtests/hashmap/GridCacheTestContext.java | 1 + .../IgniteMessagingWithClientTest.java | 2 + .../GridTcpCommunicationSpiAbstractTest.java | 4 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 2 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 2 - ...cpCommunicationSpiMultithreadedSelfTest.java | 2 +- .../discovery/AbstractDiscoverySelfTest.java | 13 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 25 + .../testframework/GridSpiTestContext.java | 10 + .../IgniteCacheFullApiSelfTestSuite.java | 6 - .../ignite/testsuites/IgniteCacheTestSuite.java | 1 + .../testsuites/IgniteCacheTestSuite4.java | 2 + 74 files changed, 1825 insertions(+), 1556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/DEVNOTES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3768db5,359de1c..a661965 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@@ -691,23 -634,8 +678,14 @@@ public class TcpCommunicationSpi extend /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; + /** Recovery and idle clients handler. */ + private CommunicationWorker commWorker; ++ + /** Shared memory accept worker. */ + private ShmemAcceptWorker shmemAcceptWorker; + - /** Idle client worker. */ - private IdleClientWorker idleClientWorker; - - /** Flush client worker. */ - private ClientFlushWorker clientFlushWorker; - - /** Socket timeout worker. */ - private SocketTimeoutWorker sockTimeoutWorker; - - /** Recovery worker. */ - private RecoveryWorker recoveryWorker; - + /** Shared memory workers. */ + private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); @@@ -1354,31 -1239,11 +1321,17 @@@ registerMBean(gridName, this, TcpCommunicationSpiMBean.class); + if (shmemSrv != null) { + shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); + + new IgniteThread(shmemAcceptWorker).start(); + } + nioSrvr.start(); - idleClientWorker = new IdleClientWorker(); + commWorker = new CommunicationWorker(); - idleClientWorker.start(); - - recoveryWorker = new RecoveryWorker(); - - recoveryWorker.start(); - - if (connBufSize > 0) { - clientFlushWorker = new ClientFlushWorker(); - - clientFlushWorker.start(); - } - - sockTimeoutWorker = new SocketTimeoutWorker(); - - sockTimeoutWorker.start(); + commWorker.start(); // Ack start. if (log.isDebugEnabled()) @@@ -1586,24 -1398,10 +1539,17 @@@ if (nioSrvr != null) nioSrvr.stop(); + U.interrupt(commWorker); - + U.join(commWorker, log); + + U.cancel(shmemAcceptWorker); + U.join(shmemAcceptWorker, log); + - U.interrupt(idleClientWorker); - U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); - - U.join(idleClientWorker, log); - U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); - + U.cancel(shmemWorkers); + U.join(shmemWorkers, log); + + shmemWorkers.clear(); + // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) client.forceClose(); @@@ -2400,147 -2095,12 +2340,150 @@@ } /** + * This worker takes responsibility to shut the server down when stopping, + * No other thread shall stop passed server. + */ + private class ShmemAcceptWorker extends GridWorker { + /** */ + private final IpcSharedMemoryServerEndpoint srv; + + /** + * @param srv Server. + */ + ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { + super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); + + this.srv = srv; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + while (!Thread.interrupted()) { + ShmemWorker e = new ShmemWorker(srv.accept()); + + shmemWorkers.add(e); + + new IgniteThread(e).start(); + } + } + catch (IgniteCheckedException e) { + if (!isCancelled()) + U.error(log, "Shmem server failed.", e); + } + finally { + srv.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + srv.close(); + } + } + + /** + * + */ + private class ShmemWorker extends GridWorker { + /** */ + private final IpcEndpoint endpoint; + + /** + * @param endpoint Endpoint. + */ + private ShmemWorker(IpcEndpoint endpoint) { + super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); + + this.endpoint = endpoint; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + MessageFactory msgFactory = new MessageFactory() { + private MessageFactory impl; + + @Nullable @Override public Message create(byte type) { + if (impl == null) + impl = getSpiContext().messageFactory(); + + assert impl != null; + + return impl.create(type); + } + }; + + MessageFormatter msgFormatter = new MessageFormatter() { + private MessageFormatter impl; + + @Override public MessageWriter writer() { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.writer(); + } + + @Override public MessageReader reader(MessageFactory factory) { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.reader(factory); + } + }; + + IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>( + metricsLsnr, + log, + endpoint, + srvLsnr, + msgFormatter, + new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), + new GridConnectionBytesVerifyFilter(log) + ); + + adapter.serve(); + } + finally { + shmemWorkers.remove(this); + + endpoint.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override protected void cleanup() { + super.cleanup(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override public String toString() { + return S.toString(ShmemWorker.class, this); + } + } + + /** * */ - private class IdleClientWorker extends IgniteSpiThread { + private class CommunicationWorker extends IgniteSpiThread { + /** */ + private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); + /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1552a4b2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ----------------------------------------------------------------------