# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/05bb9d2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/05bb9d2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/05bb9d2e Branch: refs/heads/ignite-51 Commit: 05bb9d2e0586f1934d4cd1771b82e3b681e292fc Parents: ea39d66 b2675bc Author: sboikov <sboi...@gridgain.com> Authored: Tue Mar 3 07:39:31 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 3 07:39:31 2015 +0300 ---------------------------------------------------------------------- examples/pom-standalone.xml | 2 +- .../java/org/apache/ignite/IgniteCluster.java | 8 +- .../configuration/IgniteConfiguration.java | 5 +- .../org/apache/ignite/internal/IgnitionEx.java | 592 +++++++++---------- .../affinity/GridAffinityAssignmentCache.java | 13 +- .../processors/cache/CacheMetricsImpl.java | 4 + .../processors/cache/GridCacheContext.java | 19 + .../processors/cache/GridCacheMapEntry.java | 9 + .../processors/cache/GridCacheProcessor.java | 15 + .../processors/cache/GridCacheStoreManager.java | 12 +- .../GridDistributedCacheAdapter.java | 2 + .../distributed/dht/GridDhtCacheEntry.java | 3 + .../distributed/dht/GridDhtLocalPartition.java | 5 + .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../cache/transactions/IgniteTxManager.java | 7 +- .../dataload/IgniteDataLoaderImpl.java | 29 +- .../core/src/main/resources/ignite.properties | 2 +- .../GridCacheAbstractLocalStoreSelfTest.java | 440 ++++++++++++++ .../cache/GridCacheAbstractMetricsSelfTest.java | 42 ++ ...acheAbstractUsersAffinityMapperSelfTest.java | 207 +++++++ ...dCacheAtomicUsersAffinityMapperSelfTest.java | 45 ++ .../GridCachePartitionedLocalStoreSelfTest.java | 51 ++ ...chePartitionedOffHeapLocalStoreSelfTest.java | 56 ++ .../GridCacheReplicatedLocalStoreSelfTest.java | 51 ++ ...heReplicatedUsersAffinityMapperSelfTest.java | 45 ++ ...ridCacheTxPartitionedLocalStoreSelfTest.java | 51 ++ .../GridCacheTxUsersAffinityMapperSelfTest.java | 45 ++ .../GridCacheQueueCleanupSelfTest.java | 1 - .../ignite/testsuites/IgniteCacheTestSuite.java | 4 + modules/scalar/pom.xml | 6 + pom.xml | 4 +- 32 files changed, 1422 insertions(+), 357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 51da1da,69795b1..40fa43a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@@ -296,11 -296,27 +296,22 @@@ public class GridAffinityAssignmentCach * @return Partition. */ public int partition(Object key) { - if (ctx.portableEnabled()) { - try { - key = ctx.marshalToPortable(key); - } - catch (IgniteException e) { - U.error(log, "Failed to marshal key to portable: " + key, e); - } - } + // TODO IGNITE-51. + if (key instanceof CacheObject) + key = ((CacheObject)key).value(ctx, false); - return aff.partition(affMapper.affinityKey(key)); + return aff.partition(affinityKey(key)); + } + + /** + * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and + * will use {@link GridCacheDefaultAffinityKeyMapper default}. + * + * @param key Key. + * @return Affinity key. + */ + private Object affinityKey(Object key) { + return (key instanceof GridCacheInternal ? ctx.defaultAffMapper() : affMapper).affinityKey(key); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 8dbabc6,00190d9..42c3fea --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@@ -284,12 -276,14 +284,14 @@@ public abstract class GridDistributedCa else dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; - try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) { + try (IgniteDataLoader<KeyCacheObject, Object> dataLdr = ignite.dataLoader(cacheName)) { + ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0); + - dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); + dataLdr.updater(GridDataLoadCacheUpdaters.<KeyCacheObject, Object>batched()); - for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { + for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { if (!locPart.isEmpty() && locPart.primary(topVer)) { - for (GridDhtCacheEntry<K, V> o : locPart.entries()) { + for (GridDhtCacheEntry o : locPart.entries()) { if (!o.obsoleteOrDeleted()) dataLdr.removeData(o.key()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index ef9a41f,29f7363..9b34f68 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@@ -486,8 -486,10 +486,10 @@@ public class GridDhtLocalPartition impl assert state() == EVICTED; try { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id, false); + boolean isLocStore = cctx.store().isLocalStore(); + if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. while (it.hasNext()) { @@@ -495,9 -497,12 +497,12 @@@ byte[] keyBytes = entry.getKey(); - K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); + KeyCacheObject key = cctx.toCacheKeyObject(null, keyBytes); - cctx.swap().remove(key, keyBytes); + cctx.swap().remove(key); + + if (isLocStore) + cctx.store().removeFromStore(null, key); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 73f4ce7,af57ce4..b2329bb --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@@ -1240,10 -1240,11 +1240,11 @@@ public class IgniteTxManager extends Gr cctx.txMetrics().onTxCommit(); for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx.cache().configuration().isStatisticsEnabled()) - cacheCtx.cache().metrics0().onTxCommit(System.nanoTime() - tx.startTime()); + // Convert start time from ms to ns. + cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000); } } @@@ -1314,9 -1315,11 +1315,11 @@@ cctx.txMetrics().onTxRollback(); for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - cacheCtx.cache().metrics0().onTxRollback(System.nanoTime() - tx.startTime()); + if (cacheCtx.cache().configuration().isStatisticsEnabled()) + // Convert start time from ms to ns. + cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index f9cb4cd,ed3bbcb..ab9b29b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@@ -486,13 -437,7 +489,7 @@@ public class IgniteDataLoaderImpl<K, V ) { assert entries != null; - if (remaps >= MAX_REMAP_CNT) { - resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + remaps)); - - return; - } - - Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>(); + Map<ClusterNode, Collection<IgniteDataLoaderEntry>> mappings = new HashMap<>(); boolean initPda = ctx.deploy().enabled() && jobPda == null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05bb9d2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java ----------------------------------------------------------------------