ignite-sprint-6: merge from ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5d007e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5d007e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5d007e3 Branch: refs/heads/ignite-484-1 Commit: a5d007e323af2d6619c11fb698992c8020d5eefa Parents: d4b9731 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jun 11 12:34:49 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jun 11 12:34:49 2015 +0300 ---------------------------------------------------------------------- DEVNOTES.txt | 6 + assembly/dependencies-fabric.xml | 1 + examples/pom.xml | 34 ++ modules/core/pom.xml | 1 - .../apache/ignite/IgniteSystemProperties.java | 3 + .../apache/ignite/cache/query/ScanQuery.java | 45 +- .../configuration/CacheConfiguration.java | 1 - .../affinity/GridAffinityAssignmentCache.java | 5 +- .../processors/cache/GridCacheAdapter.java | 15 +- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 30 +- .../processors/cache/GridCacheSwapManager.java | 55 ++- .../processors/cache/IgniteCacheProxy.java | 11 +- .../processors/cache/QueryCursorImpl.java | 23 +- .../distributed/dht/GridDhtLocalPartition.java | 7 + .../processors/cache/query/CacheQuery.java | 2 +- .../query/GridCacheDistributedQueryManager.java | 3 + .../cache/query/GridCacheQueryAdapter.java | 147 ++++++- .../cache/query/GridCacheQueryManager.java | 209 ++++++---- .../cache/query/GridCacheQueryRequest.java | 47 ++- .../processors/cache/query/QueryCursorEx.java | 8 + .../datastructures/GridCacheSetImpl.java | 4 +- .../processors/query/GridQueryIndexing.java | 4 +- .../processors/query/GridQueryProcessor.java | 18 +- .../service/GridServiceProcessor.java | 2 +- .../ignite/internal/util/GridJavaProcess.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 4 +- .../shmem/IpcSharedMemoryClientEndpoint.java | 2 +- .../ipc/shmem/IpcSharedMemoryNativeLoader.java | 151 ++++++- .../shmem/IpcSharedMemoryServerEndpoint.java | 2 +- .../util/nio/GridShmemCommunicationClient.java | 146 +++++++ .../communication/tcp/TcpCommunicationSpi.java | 415 ++++++++++++++++++- .../tcp/TcpCommunicationSpiMBean.java | 8 + .../cache/GridCacheAbstractFullApiSelfTest.java | 15 + .../cache/IgniteDynamicCacheStartSelfTest.java | 19 + .../distributed/IgniteCacheManyClientsTest.java | 169 ++++++++ .../IgniteCacheMessageRecoveryAbstractTest.java | 1 + ...achePartitionedPreloadLifecycleSelfTest.java | 2 +- ...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +- .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 +++-- .../ipc/shmem/IgfsSharedMemoryTestServer.java | 2 + .../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +- .../LoadWithCorruptedLibFileTestRunner.java | 2 +- .../IpcSharedMemoryBenchmarkReader.java | 2 +- .../IpcSharedMemoryBenchmarkWriter.java | 2 +- .../communication/GridIoManagerBenchmark0.java | 1 + .../spi/GridTcpSpiForwardingSelfTest.java | 1 + .../GridTcpCommunicationSpiAbstractTest.java | 13 + ...mmunicationSpiConcurrentConnectSelfTest.java | 4 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 21 +- ...pCommunicationSpiMultithreadedShmemTest.java | 28 ++ ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + .../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++ .../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 + .../testsuites/IgniteCacheTestSuite4.java | 2 + .../IgniteSpiCommunicationSelfTestSuite.java | 2 + modules/hadoop/pom.xml | 1 + .../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 + ...oopSecondaryFileSystemConfigurationTest.java | 14 + ...IgniteHadoopFileSystemHandshakeSelfTest.java | 7 + .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 7 + .../hadoop/HadoopAbstractSelfTest.java | 7 + .../processors/query/h2/IgniteH2Indexing.java | 44 +- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- ...CacheScanPartitionQueryFallbackSelfTest.java | 408 ++++++++++++++++++ .../cache/GridCacheCrossCacheQuerySelfTest.java | 12 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 77 +++- .../IgniteCacheQuerySelfTestSuite.java | 2 + modules/scalar-2.10/README.txt | 4 + modules/scalar-2.10/licenses/apache-2.0.txt | 202 +++++++++ .../scalar-2.10/licenses/scala-bsd-license.txt | 18 + modules/scalar-2.10/pom.xml | 197 +++++++++ modules/spark-2.10/README.txt | 4 + modules/spark-2.10/licenses/apache-2.0.txt | 202 +++++++++ .../spark-2.10/licenses/scala-bsd-license.txt | 18 + modules/spark-2.10/pom.xml | 120 ++++++ modules/spark/README.txt | 8 + modules/spark/licenses/apache-2.0.txt | 202 +++++++++ modules/spark/licenses/scala-bsd-license.txt | 18 + modules/spark/pom.xml | 114 +++++ .../org/apache/ignite/spark/IgniteContext.scala | 119 ++++++ .../org/apache/ignite/spark/IgniteRDD.scala | 244 +++++++++++ .../apache/ignite/spark/JavaIgniteContext.scala | 63 +++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 99 +++++ .../ignite/spark/impl/IgniteAbstractRDD.scala | 39 ++ .../ignite/spark/impl/IgnitePartition.scala | 24 ++ .../ignite/spark/impl/IgniteQueryIterator.scala | 27 ++ .../apache/ignite/spark/impl/IgniteSqlRDD.scala | 41 ++ .../spark/impl/JavaIgniteAbstractRDD.scala | 34 ++ .../ignite/spark/JavaIgniteRDDSelfTest.java | 298 +++++++++++++ .../scala/org/apache/ignite/spark/Entity.scala | 28 ++ .../org/apache/ignite/spark/IgniteRddSpec.scala | 231 +++++++++++ modules/visor-console-2.10/README.txt | 4 + modules/visor-console-2.10/pom.xml | 174 ++++++++ parent/pom.xml | 4 + pom.xml | 20 +- 99 files changed, 4773 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/DEVNOTES.txt ---------------------------------------------------------------------- diff --git a/DEVNOTES.txt b/DEVNOTES.txt index cd72418..d02e6ba 100644 --- a/DEVNOTES.txt +++ b/DEVNOTES.txt @@ -3,9 +3,15 @@ Ignite Fabric Maven Build Instructions Without LGPL dependencies (default): mvn clean package -DskipTests +Without LGPL dependencies and Scala 2.10: + mvn clean package -DskipTests -Dscala-2.10 + With LGPL dependencies: mvn clean package -DskipTests -Prelease,lgpl +With LGPL dependencies and Scala 2.10: + mvn clean package -DskipTests -Prelease,lgpl -Dscala-2.10 + Look for incubator-ignite-<version>-bin.zip in ./target/bin directory. NOTE: JDK version should be 1.7.0-* or >= 1.8.0-u40. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/assembly/dependencies-fabric.xml ---------------------------------------------------------------------- diff --git a/assembly/dependencies-fabric.xml b/assembly/dependencies-fabric.xml index a294243..c6668f6 100644 --- a/assembly/dependencies-fabric.xml +++ b/assembly/dependencies-fabric.xml @@ -113,6 +113,7 @@ <exclude>org.apache.ignite:ignite-examples</exclude> <exclude>org.apache.ignite:ignite-indexing</exclude> <exclude>org.apache.ignite:ignite-visor-console</exclude> + <exclude>org.apache.ignite:ignite-visor-console_2.10</exclude> <exclude>org.apache.ignite:ignite-visor-plugins</exclude> <exclude>org.apache.ignite:ignite-visor-trial</exclude> <exclude>org.apache.ignite:ignite-hadoop</exclude> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 78c5852..a775987 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -173,6 +173,40 @@ </profile> <profile> + <id>scala-2.10</id> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-scalar_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.10</artifactId> + <version>2.2.2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + + <profile> <id>java8-examples</id> <activation> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 0460b46..47ed9cb 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -129,7 +129,6 @@ <groupId>org.gridgain</groupId> <artifactId>ignite-shmem</artifactId> <version>1.0.0</version> - <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 439ea2d..b166f39 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -337,6 +337,9 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_SQL_MERGE_TABLE_MAX_SIZE = "IGNITE_SQL_MERGE_TABLE_MAX_SIZE"; + /** Maximum size for affinity assignment history. */ + public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java index 688eb2e..e6b69bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -36,11 +36,23 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ private IgniteBiPredicate<K, V> filter; + /** */ + private Integer part; + /** * Create scan query returning all entries. */ public ScanQuery() { - this(null); + this(null, null); + } + + /** + * Creates partition scan query returning all entries for given partition. + * + * @param part Partition. + */ + public ScanQuery(int part) { + this(part, null); } /** @@ -49,6 +61,17 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { * @param filter Filter. If {@code null} then all entries will be returned. */ public ScanQuery(@Nullable IgniteBiPredicate<K, V> filter) { + this(null, filter); + } + + /** + * Create scan query with filter. + * + * @param part Partition. + * @param filter Filter. If {@code null} then all entries will be returned. + */ + public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter) { + setPartition(part); setFilter(filter); } @@ -73,6 +96,26 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { return this; } + /** + * Gets partition number over which this query should iterate. Will return {@code null} if partition was not + * set. In this case query will iterate over all partitions in the cache. + * + * @return Partition number or {@code null}. + */ + @Nullable public Integer getPartition() { + return part; + } + + /** + * Sets partition number over which this query should iterate. If {@code null}, query will iterate over + * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache. + * + * @param part Partition number over which this query should iterate. + */ + public void setPartition(@Nullable Integer part) { + this.part = part; + } + /** {@inheritDoc} */ @Override public ScanQuery<K, V> setPageSize(int pageSize) { return (ScanQuery<K, V>)super.setPageSize(pageSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 1aa4fd6..a16438c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * * @param loadPrevVal Load previous value flag. * @return {@code this} for chaining. - * @return {@code this} for chaining. */ public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) { this.loadPrevVal = loadPrevVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 47f222e..6989385 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -408,9 +408,10 @@ public class GridAffinityAssignmentCache { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + "calculated [locNodeId=" + ctx.localNodeId() + ", cache=" + cacheName + - ", history=" + affCache.keySet() + ", topVer=" + topVer + - ", head=" + head.get().topologyVersion() + ']'); + ", head=" + head.get().topologyVersion() + + ", history=" + affCache.keySet() + + ']'); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d8d029e..2ca7687 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1337,8 +1337,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>(); - IgniteInternalFuture<Object> readFut = - readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() { + IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null, + subjId, taskName, new CI2<KeyCacheObject, Object>() { /** Version for all loaded entries. */ private GridCacheVersion nextVer = ctx.versions().next(); @@ -1968,7 +1968,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Optional filter. * @return Put operation future. */ - public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) { + public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3137,7 +3138,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException { + @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) + throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -3711,7 +3713,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1) return localIteratorHonorExpirePolicy(opCtx); - CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable()) + CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable()) .keepAll(false) .execute(); @@ -3944,7 +3946,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return t; } - catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) { + catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | + IgniteTxRollbackCheckedException e) { throw e; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3236bb5..3df45cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -59,7 +59,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private static final int EXCHANGE_HISTORY_SIZE = 1000; /** Cleanup history size. */ - public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = 10; + public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100); /** Atomic reference for pending timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 871cd77..5582ba7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1962,7 +1962,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.cacheType(cacheType); - return F.first(initiateCacheChanges(F.asList(req))); + return F.first(initiateCacheChanges(F.asList(req), failIfExists)); } /** @@ -1972,14 +1972,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { public IgniteInternalFuture<?> dynamicStopCache(String cacheName) { DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true); - return F.first(initiateCacheChanges(F.asList(t))); + return F.first(initiateCacheChanges(F.asList(t), false)); } /** * @param reqs Requests. * @return Collection of futures. */ - public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) { + @SuppressWarnings("TypeMayBeWeakened") + private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs, + boolean failIfExists) { Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size()); Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size()); @@ -2012,9 +2014,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { maskNull(req.cacheName()), fut); if (old != null) { - if (req.start() && !req.clientStartOnly()) { - fut.onDone(new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already being started or stopped): " + req.cacheName())); + if (req.start()) { + if (!req.clientStartOnly()) { + if (failIfExists) + fut.onDone(new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already being started or stopped): " + + req.cacheName())); + else { + fut = old; + + continue; + } + } + else { + fut = old; + + continue; + } } else { fut = old; @@ -2664,7 +2680,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(true); - F.first(initiateCacheChanges(F.asList(req))).get(); + F.first(initiateCacheChanges(F.asList(req), false)).get(); IgniteCacheProxy cache = jCacheProxies.get(masked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 772e849..d0d9049 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1251,7 +1251,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); if (offHeapEnabled() && !swapEnabled()) - return rawOffHeapIterator(true, true); + return rawOffHeapIterator(null, true, true); if (swapEnabled() && !offHeapEnabled()) return rawSwapIterator(true, true); @@ -1267,7 +1267,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private Map.Entry<byte[], byte[]> cur; { - it = rawOffHeapIterator(true, true); + it = rawOffHeapIterator(null, true, true); advance(); } @@ -1598,11 +1598,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param c Key/value closure. + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Off-heap iterator. */ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, + Integer part, boolean primary, boolean backup) { @@ -1618,24 +1620,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<T, T>(parts) { @Override protected GridCloseableIterator<T> partitionIterator(int part) - throws IgniteCheckedException - { + throws IgniteCheckedException { return offheap.iterator(spaceName, c, part); } }; } /** + * + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Raw off-heap iterator. */ - public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary, + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part, + final boolean primary, final boolean backup) { if (!offheapEnabled || (!primary && !backup)) @@ -1673,8 +1682,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) { private Map.Entry<byte[], byte[]> cur; @@ -1751,6 +1765,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param part Partition. + * @return Raw off-heap iterator. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part) + throws IgniteCheckedException + { + if (!swapEnabled) + return new GridEmptyCloseableIterator<>(); + + checkIteratorQueue(); + + return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>( + Collections.singleton(part)) { + @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException + { + return swapMgr.rawIterator(spaceName, part); + } + }; + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 4390993..69ce7b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -353,7 +353,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (filter instanceof ScanQuery) { IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); - qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable); + qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(), + isKeepPortable); if (grp != null) qry.projection(grp); @@ -496,10 +497,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal()); if (qry instanceof SqlQuery) { - SqlQuery p = (SqlQuery)qry; + final SqlQuery p = (SqlQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p)); + return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return ctx.kernalContext().query().<K, V>queryLocal(ctx, p); + } + }); return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 7cb9efc..d68c377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -27,6 +27,9 @@ import java.util.*; * Query cursor implementation. */ public class QueryCursorImpl<T> implements QueryCursorEx<T> { + /** Query executor. */ + private Iterable<T> iterExec; + /** */ private Iterator<T> iter; @@ -34,18 +37,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> { private boolean iterTaken; /** */ - private Collection<GridQueryFieldMetadata> fieldsMeta; + private List<GridQueryFieldMetadata> fieldsMeta; /** - * @param iter Iterator. + * @param iterExec Query executor. */ - public QueryCursorImpl(Iterator<T> iter) { - this.iter = iter; + public QueryCursorImpl(Iterable<T> iterExec) { + this.iterExec = iterExec; } /** {@inheritDoc} */ @Override public Iterator<T> iterator() { - if (iter == null) + if (iter == null && iterTaken) throw new IgniteException("Cursor is closed."); if (iterTaken) @@ -53,12 +56,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> { iterTaken = true; + iter = iterExec.iterator(); + + assert iter != null; + return iter; } /** {@inheritDoc} */ @Override public List<T> getAll() { - ArrayList<T> all = new ArrayList<>(); + List<T> all = new ArrayList<>(); try { for (T t : this) // Implicitly calls iterator() to do all checks. @@ -103,14 +110,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> { /** * @param fieldsMeta SQL Fields query result metadata. */ - public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) { + public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) { this.fieldsMeta = fieldsMeta; } /** * @return SQL Fields query result metadata. */ - public Collection<GridQueryFieldMetadata> fieldsMeta() { + @Override public List<GridQueryFieldMetadata> fieldsMeta() { return fieldsMeta; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 0749f66..8ac3809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> } /** + * @return Keys belonging to partition. + */ + public Set<KeyCacheObject> keySet() { + return map.keySet(); + } + + /** * @return Entries belonging to partition. */ public Collection<GridDhtCacheEntry> entries() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 0658828..2d2db1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -76,7 +76,7 @@ import org.jetbrains.annotations.*; * </li> * <li> * Joins will work correctly only if joined objects are stored in - * collocated mode or at least one side of the join is stored in + * colocated mode or at least one side of the join is stored in * {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to * {@link AffinityKey} javadoc for more information about colocation. * </li> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index a579aab..2b93144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage false, null, req.keyValueFilter(), + req.partition(), req.className(), req.clause(), req.includeMetaData(), @@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), clsName, qry.query().scanFilter(), + qry.query().partition(), qry.reducer(), qry.transform(), qry.query().pageSize(), @@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), null, null, + null, qry.reducer(), qry.transform(), qry.query().pageSize(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 7f0a5ec..5b82c34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -21,13 +21,17 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; + import org.jetbrains.annotations.*; import java.util.*; @@ -38,6 +42,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy * Query adapter. */ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { + /** Is local node predicate. */ + private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return n.isLocal(); + } + }; + /** */ private final GridCacheContext<?, ?> cctx; @@ -56,6 +67,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private final IgniteBiPredicate<Object, Object> filter; + /** Partition. */ + private Integer part; + /** */ private final boolean incMeta; @@ -95,6 +109,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param clsName Class name. * @param clause Clause. * @param filter Scan filter. + * @param part Partition. * @param incMeta Include metadata flag. * @param keepPortable Keep portable flag. */ @@ -103,16 +118,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, boolean incMeta, boolean keepPortable) { assert cctx != null; assert type != null; + assert part == null || part >= 0; this.cctx = cctx; this.type = type; this.clsName = clsName; this.clause = clause; this.filter = filter; + this.part = part; this.incMeta = incMeta; this.keepPortable = keepPortable; @@ -132,6 +150,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param dedup Enable dedup flag. * @param prj Grid projection. * @param filter Key-value filter. + * @param part Partition. * @param clsName Class name. * @param clause Clause. * @param incMeta Include metadata flag. @@ -149,6 +168,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { boolean dedup, ClusterGroup prj, IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, @Nullable String clsName, String clause, boolean incMeta, @@ -165,6 +185,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.dedup = dedup; this.prj = prj; this.filter = filter; + this.part = part; this.clsName = clsName; this.clause = clause; this.incMeta = incMeta; @@ -334,6 +355,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** + * @return Partition. + */ + @Nullable public Integer partition() { + return part; + } + + /** * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { @@ -410,16 +438,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { taskHash = cctx.kernalContext().job().currentTaskNameHash(); - GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, + final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, (IgniteClosure<Object, Object>)rmtTransform, args); - GridCacheQueryManager qryMgr = cctx.queries(); + final GridCacheQueryManager qryMgr = cctx.queries(); boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); + else if (type == SCAN && part != null && nodes.size() > 1) + return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); else return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -439,15 +469,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return Collections.singletonList(cctx.localNode()); case REPLICATED: - if (prj != null) - return nodes(cctx, prj); + if (prj != null || partition() != null) + return nodes(cctx, prj, partition()); return cctx.affinityNode() ? Collections.singletonList(cctx.localNode()) : - Collections.singletonList(F.rand(nodes(cctx, null))); + Collections.singletonList(F.rand(nodes(cctx, null, partition()))); case PARTITIONED: - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); default: throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); @@ -459,17 +489,26 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param prj Projection (optional). * @return Collection of data nodes in provided projection (if any). */ - private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { + private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, + @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; + final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + Collection<ClusterNode> affNodes = CU.affinityNodes(cctx); - if (prj == null) + if (prj == null && part == null) return affNodes; + final Set<ClusterNode> owners = + part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer)); + return F.view(affNodes, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return prj.node(n.id()) != null; + + return cctx.discovery().cacheAffinityNode(n, cctx.name()) && + (prj == null || prj.node(n.id()) != null) && + (part == null || owners.contains(n)); } }); } @@ -478,4 +517,94 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Override public String toString() { return S.toString(GridCacheQueryAdapter.class, this); } + + /** + * Wrapper for queries with fallback. + */ + private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>> + implements CacheQueryFuture<R> { + /** Query future. */ + private volatile GridCacheQueryFutureAdapter<?, ?, R> fut; + + /** Backups. */ + private final Queue<ClusterNode> nodes; + + /** Bean. */ + private final GridCacheQueryBean bean; + + /** Query manager. */ + private final GridCacheQueryManager qryMgr; + + /** + * @param nodes Backups. + * @param bean Bean. + * @param qryMgr Query manager. + */ + public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr) { + this.nodes = fallbacks(nodes); + this.bean = bean; + this.qryMgr = qryMgr; + + init(); + } + + /** + * @param nodes Nodes. + */ + private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) { + Queue<ClusterNode> fallbacks = new LinkedList<>(); + + ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE)); + + if (node != null) + fallbacks.add(node); + + fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes); + + return fallbacks; + } + + /** + * + */ + private void init() { + ClusterNode node = nodes.poll(); + + GridCacheQueryFutureAdapter<?, ?, R> fut0 = + (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); + + fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() { + @Override public void apply(IgniteInternalFuture<Collection<R>> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + if (F.isEmpty(nodes)) + onDone(e); + else + init(); + } + } + }); + + fut = fut0; + } + + /** {@inheritDoc} */ + @Override public int available() { + return fut.available(); + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return fut.cancel(); + } + + /** {@inheritDoc} */ + @Override public R next() { + return fut.next(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 32e9d63..6e71ba7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -52,6 +52,8 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; /** @@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final Object recipient = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { - @Override - public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) + @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } @@ -768,98 +769,138 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - private IgniteBiTuple<K, V> next; + final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = + new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IgniteBiTuple<K, V> next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter; - { - advance(); - } + private GridDhtLocalPartition locPart; - @Override public boolean onHasNext() { - return next != null; - } + { + Integer part = qry.partition(); - @Override public IgniteBiTuple<K, V> onNext() { - if (next == null) - throw new NoSuchElementException(); + if (part == null || dht == null) + iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + iter = F.emptyIterator(); + else { + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - IgniteBiTuple<K, V> next0 = next; + locPart = dht.topology().localPartition(part, topVer, false); - advance(); + if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) || + !locPart.reserve()) + throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - return next0; - } + iter = new Iterator<K>() { + private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator(); - private void advance() { - IgniteBiTuple<K, V> next0 = null; - - while (iter.hasNext()) { - next0 = null; + @Override public boolean hasNext() { + return iter0.hasNext(); + } - K key = iter.next(); + @Override public K next() { + KeyCacheObject key = iter0.next(); - V val; + return key.value(cctx.cacheObjectContext(), false); + } - try { - val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + @Override public void remove() { + iter0.remove(); + } + }; } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - val = null; - } + advance(); + } - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); + @Override public boolean onHasNext() { + return next != null; + } - expiryPlc = cctx.cache().expiryPolicy(plc); - } + @Override public IgniteBiTuple<K, V> onNext() { + if (next == null) + throw new NoSuchElementException(); - if (val != null) { - next0 = F.t(key, val); + IgniteBiTuple<K, V> next0 = next; - if (checkPredicate(next0)) - break; - else - next0 = null; - } + advance(); + + return next0; } - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; + private void advance() { + IgniteBiTuple<K, V> next0 = null; - if (next == null) - sendTtlUpdate(); - } + while (iter.hasNext()) { + next0 = null; - @Override protected void onClose() { - sendTtlUpdate(); - } + K key = iter.next(); + + V val; - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); + try { + val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } - expiryPlc = null; + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().expiryPolicy(plc); + } + + if (val != null) { + next0 = F.t(key, val); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } + } + + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; + + if (next == null) + sendTtlUpdate(); } - } - private boolean checkPredicate(Map.Entry<K, V> e) { - if (keyValFilter != null) { - Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + @Override protected void onClose() { + sendTtlUpdate(); - return keyValFilter.apply(e0.getKey(), e0.getValue()); + if (locPart != null) + locPart.release(); } - return true; - } - }; + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + private boolean checkPredicate(Map.Entry<K, V> e) { + if (keyValFilter != null) { + Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } + + return true; + } + }; final GridIterator<IgniteBiTuple<K, V>> it; @@ -914,7 +955,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups); + Integer part = qry.partition(); + + Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : + cctx.swap().rawSwapIterator(part); return scanIterator(it, filter, qry.keepPortable()); } @@ -930,10 +974,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); - return cctx.swap().rawOffHeapIterator(c, true, backups); + return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups); + Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); return scanIterator(it, filter, qry.keepPortable()); } @@ -1222,7 +1266,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteClosure<Map.Entry<K, V>, Object> trans = + (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); injectResources(trans); @@ -1282,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte K key = row.getKey(); - // Filter backups for SCAN queries. Other types are filtered in indexing manager. - if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN && - !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) { + // Filter backups for SCAN queries, if it isn't partition scan. + // Other types are filtered in indexing manager. + if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null && + cctx.config().getCacheMode() != LOCAL && !incBackups && + !cctx.affinity().primary(cctx.localNode(), key, topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring backup element [row=" + row + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups + @@ -1529,11 +1577,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false, qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId()))); } - catch (Error e) { - fut.onDone(e); - - throw e; - } catch (Throwable e) { fut.onDone(e); @@ -1843,7 +1886,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE); + return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE); } }; } @@ -2920,6 +2963,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, null, null, + null, false, keepPortable); } @@ -2928,17 +2972,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Creates user's predicate based scan query. * * @param filter Scan filter. + * @param part Partition. * @param keepPortable Keep portable flag. * @return Created query. */ - @SuppressWarnings("unchecked") public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, - boolean keepPortable) { + @Nullable Integer part, boolean keepPortable) { + return new GridCacheQueryAdapter<>(cctx, SCAN, null, null, (IgniteBiPredicate<Object, Object>)filter, + part, false, keepPortable); } @@ -2962,6 +3008,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte clsName, search, null, + null, false, keepPortable); } @@ -2982,6 +3029,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, false, keepPortable); } @@ -3002,6 +3050,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, incMeta, keepPortable); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 845077f..2113e7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -26,6 +26,8 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + import java.io.*; import java.nio.*; import java.util.*; @@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** */ private int taskHash; + /** Partition. */ + private int part; + /** * Required by {@link Externalizable} */ @@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param clause Query clause. * @param clsName Query class name. * @param keyValFilter Key-value filter. + * @param part Partition. * @param rdc Reducer. * @param trans Transformer. * @param pageSize Page size. @@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache String clause, String clsName, IgniteBiPredicate<Object, Object> keyValFilter, + @Nullable Integer part, IgniteReducer<Object, Object> rdc, IgniteClosure<Object, Object> trans, int pageSize, @@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.clause = clause; this.clsName = clsName; this.keyValFilter = keyValFilter; + this.part = part == null ? -1 : part; this.rdc = rdc; this.trans = trans; this.pageSize = pageSize; @@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache return taskHash; } + /** + * @return partition. + */ + @Nullable public Integer partition() { + return part == -1 ? null : part; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -508,30 +523,36 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 16: - if (!writer.writeByteArray("rdcBytes", rdcBytes)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeByteArray("rdcBytes", rdcBytes)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskHash", taskHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeByteArray("transBytes", transBytes)) + if (!writer.writeInt("taskHash", taskHash)) return false; writer.incrementState(); case 20: + if (!writer.writeByteArray("transBytes", transBytes)) + return false; + + writer.incrementState(); + + case 21: if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1)) return false; @@ -658,7 +679,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 16: - rdcBytes = reader.readByteArray("rdcBytes"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -666,7 +687,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 17: - subjId = reader.readUuid("subjId"); + rdcBytes = reader.readByteArray("rdcBytes"); if (!reader.isLastRead()) return false; @@ -674,7 +695,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 18: - taskHash = reader.readInt("taskHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -682,7 +703,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 19: - transBytes = reader.readByteArray("transBytes"); + taskHash = reader.readInt("taskHash"); if (!reader.isLastRead()) return false; @@ -690,6 +711,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 20: + transBytes = reader.readByteArray("transBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 21: byte typeOrd; typeOrd = reader.readByte("type"); @@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 21; + return 22; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java index bf1d4ea..5e19b99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.query.*; + +import java.util.*; /** * Extended query cursor interface allowing for "getAll" to output data into destination other than Collection. @@ -32,6 +35,11 @@ public interface QueryCursorEx<T> extends QueryCursor<T> { public void getAll(Consumer<T> c) throws IgniteCheckedException; /** + * @return Query metadata. + */ + public List<GridQueryFieldMetadata> fieldsMeta(); + + /** * Query value consumer. */ public static interface Consumer<T> { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f516968..f74fe95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite private GridCloseableIterator<T> iterator0() { try { CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 0bb820d..7fcc284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -58,9 +58,11 @@ public interface GridQueryIndexing { * * @param cctx Cache context. * @param qry Query. + * @param keepCacheObjects If {@code true}, cache objects representation will be preserved. * @return Cursor. */ - public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry); + public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, + boolean keepCacheObjects); /** * Parses SQL query into two step query and executes it. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ed8e1e2..e187713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -539,16 +539,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Cursor. */ - public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) { + public Iterable<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) { checkxEnabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + GridCacheContext<Object, Object> cacheCtx = ctx.cache().internalCache(space).context(); + return idx.queryTwoStep( - ctx.cache().internalCache(space).context(), - qry); + cacheCtx, + qry, + cacheCtx.keepPortable()); } finally { busyLock.leaveBusy(); @@ -715,12 +718,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { String sql = qry.getSql(); Object[] args = qry.getArgs(); - GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); + final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); sendQueryExecutedEvent(sql, args); - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable())); + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()); + } + }); cursor.fieldsMeta(res.metaData()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 64eb1c1..bb451c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -934,7 +934,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { GridCacheQueryManager qryMgr = cache.context().queries(); - CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false); + CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false); qry.keepAll(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java index 42fe089..4946eb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java @@ -138,7 +138,7 @@ public final class GridJavaProcess { procCommands.add(javaBin); procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs); - if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) { + if (jvmArgs == null || (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath"))) { String classpath = System.getProperty("java.class.path"); String sfcp = System.getProperty("surefire.test.class.path"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 0932212..9016b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9025,11 +9025,11 @@ public abstract class IgniteUtils { hasShmem = false; else { try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); hasShmem = true; } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException ignore) { hasShmem = false; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java index 27a234f..c935c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { boolean clear = true; try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log); sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);