ignite-342 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8baca046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8baca046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8baca046 Branch: refs/heads/ignite-141 Commit: 8baca046e02e703b5ce5c0a2f1a34ba279392a3e Parents: 11efb91 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Mon Mar 2 17:16:26 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Mar 2 17:16:26 2015 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 13 +- .../processors/cache/GridCacheContext.java | 19 ++ .../processors/cache/GridCacheProcessor.java | 15 ++ ...acheAbstractUsersAffinityMapperSelfTest.java | 207 +++++++++++++++++++ ...dCacheAtomicUsersAffinityMapperSelfTest.java | 45 ++++ ...heReplicatedUsersAffinityMapperSelfTest.java | 45 ++++ .../GridCacheTxUsersAffinityMapperSelfTest.java | 45 ++++ 7 files changed, 388 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 9c12a17..69795b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -305,7 +305,18 @@ public class GridAffinityAssignmentCache { } } - return aff.partition(affMapper.affinityKey(key)); + return aff.partition(affinityKey(key)); + } + + /** + * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and + * will use {@link GridCacheDefaultAffinityKeyMapper default}. + * + * @param key Key. + * @return Affinity key. + */ + private Object affinityKey(Object key) { + return (key instanceof GridCacheInternal ? ctx.defaultAffMapper() : affMapper).affinityKey(key); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3ec013c..44f8e69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -164,6 +165,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cached local rich node. */ private ClusterNode locNode; + /** Default cache affinity mapper. */ + private CacheAffinityKeyMapper affMapper; + /** * Thread local projection. If it's set it means that method call was initiated * by child projection of initial cache. @@ -1016,6 +1020,20 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Default affinity key mapper. + */ + public CacheAffinityKeyMapper defaultAffMapper() { + return affMapper; + } + + /** + * Sets default affinity key mapper. + */ + public void defaultAffMapper(CacheAffinityKeyMapper dfltAffMapper) { + this.affMapper = dfltAffMapper; + } + + /** * @param p Single predicate. * @return Array containing single predicate. */ @@ -1770,6 +1788,7 @@ public class GridCacheContext<K, V> implements Externalizable { evictMgr = null; qryMgr = null; dataStructuresMgr = null; + affMapper = null; mgrs.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e99c706..f74f969 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -618,6 +618,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore)); + // Init default key mapper. + CacheAffinityKeyMapper dfltAffMapper; + + if (cfg.getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class)) + dfltAffMapper = cfg.getAffinityMapper(); + else { + dfltAffMapper = new GridCacheDefaultAffinityKeyMapper(); + + prepare(cfg, dfltAffMapper, false); + } + cfgs[i] = cfg; // Replace original configuration value. GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); @@ -655,6 +666,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { drMgr, jta); + cacheCtx.defaultAffMapper(dfltAffMapper); + GridCacheAdapter cache = null; switch (cfg.getCacheMode()) { @@ -793,6 +806,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { drMgr, jta); + cacheCtx.defaultAffMapper(dfltAffMapper); + GridDhtCacheAdapter dht = null; switch (cfg.getAtomicityMode()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java new file mode 100644 index 0000000..71f28ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; + +/** + * Test affinity mapper. + */ +public abstract class GridCacheAbstractUsersAffinityMapperSelfTest extends GridCommonAbstractTest { + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + public static final CacheAffinityKeyMapper AFFINITY_MAPPER = new UsersAffinityKeyMapper(); + + /** */ + public GridCacheAbstractUsersAffinityMapperSelfTest() { + super(false /* doesn't start grid */); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(null); + cacheCfg.setCacheMode(getCacheMode()); + cacheCfg.setAtomicityMode(getAtomicMode()); + cacheCfg.setDistributionMode(getDistributionMode()); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setPreloadMode(CachePreloadMode.SYNC); + cacheCfg.setAffinityMapper(AFFINITY_MAPPER); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** + * @return Distribution mode. + */ + protected abstract CacheDistributionMode getDistributionMode(); + + /** + * @return Cache atomicity mode. + */ + protected abstract CacheAtomicityMode getAtomicMode(); + + /** + * @return Cache mode. + */ + protected abstract CacheMode getCacheMode(); + + /** + * @throws Exception If failed. + */ + public void testAffinityMapper() throws Exception { + IgniteCache<Object, Object> cache = startGrid(0).jcache(null); + + for (int i = 0; i < KEY_CNT; i++) { + cache.put(String.valueOf(i), String.valueOf(i)); + + cache.put(new TestAffinityKey(i, String.valueOf(i)), i); + } + + assertEquals(1, cache.get(new TestAffinityKey(1, "1"))); + + startGrid(1); + + for (int i = 0; i < KEY_CNT; i++) + grid(i % 2).compute().affinityRun(null, new TestAffinityKey(1, "1"), new NoopClosure()); + } + + /** + * Test key for field annotation. + */ + private static class TestAffinityKey implements Externalizable { + /** Key. */ + private int key; + + /** Affinity key. */ + @CacheAffinityKeyMapped + private String affKey; + + /** + * Constructor. + */ + public TestAffinityKey() { + } + + /** + * Constructor. + * + * @param key Key. + * @param affKey Affinity key. + */ + TestAffinityKey(int key, String affKey) { + this.key = key; + this.affKey = affKey; + } + + /** + * @return Key. + */ + public int key() { + return key; + } + + /** + * @return Affinity key. + */ + public String affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return o instanceof TestAffinityKey && key == ((TestAffinityKey)o).key(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key + affKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(key); + out.writeUTF(affKey); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + key = in.readInt(); + affKey = in.readUTF(); + } + } + + /** + * Users affinity mapper. + */ + private static class UsersAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper{ + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + GridArgumentCheck.notNull(key, "key"); + + assertFalse("GridCacheInternal entry mustn't be passed in user's key mapper.", + key instanceof GridCacheInternal); + + return super.affinityKey(key); + } + } + + /** + * Noop closure. + */ + private static class NoopClosure implements IgniteRunnable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java new file mode 100644 index 0000000..8a80e35 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheAtomicUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheAtomicUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java new file mode 100644 index 0000000..47e5dc7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheReplicatedUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheReplicatedUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8baca046/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java new file mode 100644 index 0000000..61af04e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheTxUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheTxUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.PARTITIONED; + } +}