http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java index 10e4fb5..cd1c9ef 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.apache.ignite.portables.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.future.*; @@ -50,13 +50,13 @@ public class GridAffinityAssignmentCache { private int backups; /** Affinity function. */ - private final GridCacheAffinityFunction aff; + private final CacheAffinityFunction aff; /** Partitions count. */ private final int partsCnt; /** Affinity mapper function. */ - private final GridCacheAffinityKeyMapper affMapper; + private final CacheAffinityKeyMapper affMapper; /** Affinity calculation results cache: topology version => partition => nodes. */ private final ConcurrentMap<Long, GridAffinityAssignment> affCache; @@ -82,8 +82,8 @@ public class GridAffinityAssignmentCache { * @param affMapper Affinity key mapper. */ @SuppressWarnings("unchecked") - public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, GridCacheAffinityFunction aff, - GridCacheAffinityKeyMapper affMapper, int backups) { + public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, CacheAffinityFunction aff, + CacheAffinityKeyMapper affMapper, int backups) { this.ctx = ctx; this.aff = aff; this.affMapper = affMapper; @@ -144,7 +144,7 @@ public class GridAffinityAssignmentCache { List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment(); List<List<ClusterNode>> assignment = aff.assignPartitions( - new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups)); + new CacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups)); GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java index c66176d..d6b7633 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.eventstorage.*; import org.gridgain.grid.kernal.processors.*; @@ -364,7 +364,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** - * Requests {@link GridCacheAffinityFunction} and {@link GridCacheAffinityKeyMapper} from remote node. + * Requests {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} and {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} from remote node. * * @param cacheName Name of cache on which affinity is requested. * @param n Node from which affinity is requested. @@ -376,8 +376,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter { GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get(); - GridCacheAffinityFunction f = (GridCacheAffinityFunction)unmarshall(ctx, n.id(), t.get1()); - GridCacheAffinityKeyMapper m = (GridCacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); + CacheAffinityFunction f = (CacheAffinityFunction)unmarshall(ctx, n.id(), t.get1()); + CacheAffinityKeyMapper m = (CacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); assert m != null; @@ -458,10 +458,10 @@ public class GridAffinityProcessor extends GridProcessorAdapter { */ private static class AffinityInfo { /** Affinity function. */ - private GridCacheAffinityFunction affFunc; + private CacheAffinityFunction affFunc; /** Mapper */ - private GridCacheAffinityKeyMapper mapper; + private CacheAffinityKeyMapper mapper; /** Assignment. */ private GridAffinityAssignment assignment; @@ -475,7 +475,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @param assignment Partition assignment. * @param portableEnabled Portable enabled flag. */ - private AffinityInfo(GridCacheAffinityFunction affFunc, GridCacheAffinityKeyMapper mapper, + private AffinityInfo(CacheAffinityFunction affFunc, CacheAffinityKeyMapper mapper, GridAffinityAssignment assignment, boolean portableEnabled) { this.affFunc = affFunc; this.mapper = mapper; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java index 1a3f0ba..4594bba 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java @@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.affinity; import org.apache.ignite.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.deployment.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -39,11 +38,11 @@ import java.util.concurrent.*; */ class GridAffinityUtils { /** - * Creates a job that will look up {@link GridCacheAffinityKeyMapper} and {@link GridCacheAffinityFunction} on a + * Creates a job that will look up {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} and {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} on a * cache with given name. If they exist, this job will serialize and transfer them together with all deployment * information needed to unmarshal objects on remote node. Result is returned as a {@link GridTuple3}, - * where first object is {@link GridAffinityMessage} for {@link GridCacheAffinityFunction}, second object - * is {@link GridAffinityMessage} for {@link GridCacheAffinityKeyMapper} and third object is affinity assignment + * where first object is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.CacheAffinityFunction}, second object + * is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} and third object is affinity assignment * for given topology version. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java deleted file mode 100644 index 308e822..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.gridgain.grid.cache.affinity.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Cache affinity function context implementation. Simple bean that holds all required fields. - */ -public class GridCacheAffinityFunctionContextImpl implements GridCacheAffinityFunctionContext { - /** Topology snapshot. */ - private List<ClusterNode> topSnapshot; - - /** Previous affinity assignment. */ - private List<List<ClusterNode>> prevAssignment; - - /** Discovery event that caused this topology change. */ - private IgniteDiscoveryEvent discoEvt; - - /** Topology version. */ - private long topVer; - - /** Number of backups to assign. */ - private int backups; - - /** - * @param topSnapshot Topology snapshot. - * @param topVer Topology version. - */ - public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, - IgniteDiscoveryEvent discoEvt, long topVer, int backups) { - this.topSnapshot = topSnapshot; - this.prevAssignment = prevAssignment; - this.discoEvt = discoEvt; - this.topVer = topVer; - this.backups = backups; - } - - /** {@inheritDoc} */ - @Nullable @Override public List<ClusterNode> previousAssignment(int part) { - return prevAssignment.get(part); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> currentTopologySnapshot() { - return topSnapshot; - } - - /** {@inheritDoc} */ - @Override public long currentTopologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() { - return discoEvt; - } - - /** {@inheritDoc} */ - @Override public int backups() { - return backups; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java new file mode 100644 index 0000000..93fb87c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.lang.annotation.*; +import java.lang.reflect.*; + +/** + * Default key affinity mapper. If key class has annotation {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapped}, + * then the value of annotated method or field will be used to get affinity value instead + * of the key itself. If there is no annotation, then the key is used as is. + * <p> + * Convenience affinity key adapter, {@link org.apache.ignite.cache.affinity.CacheAffinityKey} can be used in + * conjunction with this mapper to automatically provide custom affinity keys for cache keys. + * <p> + * If non-default affinity mapper is used, is should be provided via + * {@link GridCacheConfiguration#getAffinityMapper()} configuration property. + */ +public class CacheDefaultAffinityKeyMapper implements CacheAffinityKeyMapper { + /** */ + private static final long serialVersionUID = 0L; + + /** Reflection cache. */ + private GridReflectionCache reflectCache = new GridReflectionCache( + new P1<Field>() { + @Override public boolean apply(Field f) { + // Account for anonymous inner classes. + return f.getAnnotation(CacheAffinityKeyMapped.class) != null; + } + }, + new P1<Method>() { + @Override public boolean apply(Method m) { + // Account for anonymous inner classes. + Annotation ann = m.getAnnotation(CacheAffinityKeyMapped.class); + + if (ann != null) { + if (!F.isEmpty(m.getParameterTypes())) + throw new IllegalStateException("Method annotated with @GridCacheAffinityKey annotation " + + "cannot have parameters: " + m); + + return true; + } + + return false; + } + } + ); + + /** Logger. */ + @IgniteLoggerResource + private transient IgniteLogger log; + + /** + * If key class has annotation {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapped}, + * then the value of annotated method or field will be used to get affinity value instead + * of the key itself. If there is no annotation, then the key is returned as is. + * + * @param key Key to get affinity key for. + * @return Affinity key for given key. + */ + @Override public Object affinityKey(Object key) { + GridArgumentCheck.notNull(key, "key"); + + if (key instanceof PortableObject) { + PortableObject po = (PortableObject)key; + + try { + PortableMetadata meta = po.metaData(); + + if (meta != null) { + String affKeyFieldName = meta.affinityKeyFieldName(); + + if (affKeyFieldName != null) + return po.field(affKeyFieldName); + } + } + catch (PortableException e) { + U.error(log, "Failed to get affinity field from portable object: " + key, e); + } + } + else { + try { + Object o = reflectCache.firstFieldValue(key); + + if (o != null) + return o; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to access affinity field for key [field=" + + reflectCache.firstField(key.getClass()) + ", key=" + key + ']', e); + } + + try { + Object o = reflectCache.firstMethodValue(key); + + if (o != null) + return o; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to invoke affinity method for key [mtd=" + + reflectCache.firstMethod(key.getClass()) + ", key=" + key + ']', e); + } + } + + return key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 1a31edd..9b9fe9f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -29,7 +29,6 @@ import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; @@ -158,7 +157,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im private GridCacheDataStructures dataStructures; /** Affinity impl. */ - private GridCacheAffinity<K> aff; + private IgniteCacheAffinity<K> aff; /** Whether this cache is GGFS data cache. */ private boolean ggfsDataCache; @@ -265,7 +264,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im qry = new GridCacheQueriesImpl<>(ctx, null); dataStructures = new GridCacheDataStructuresImpl<>(ctx); - aff = new GridCacheAffinityImpl<>(ctx); + aff = new CacheAffinityImpl<>(ctx); } /** @@ -349,8 +348,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return qry; } - /** {@inheritDoc} */ - @Override public GridCacheAffinity<K> affinity() { + public IgniteCacheAffinity<K> affinityProxy() { return aff; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java index 84ac35a..4012e4f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java @@ -18,8 +18,8 @@ package org.gridgain.grid.kernal.processors.cache; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -185,11 +185,11 @@ public class GridCacheAttributes implements Externalizable { affKeyBackups = cfg.getBackups(); - GridCacheAffinityFunction aff = cfg.getAffinity(); + CacheAffinityFunction aff = cfg.getAffinity(); if (aff != null) { - if (aff instanceof GridCacheConsistentHashAffinityFunction) { - GridCacheConsistentHashAffinityFunction aff0 = (GridCacheConsistentHashAffinityFunction) aff; + if (aff instanceof CacheConsistentHashAffinityFunction) { + CacheConsistentHashAffinityFunction aff0 = (CacheConsistentHashAffinityFunction) aff; affInclNeighbors = aff0.isExcludeNeighbors(); affReplicas = aff0.getDefaultReplicas(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java deleted file mode 100644 index efcb798..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.portables.*; -import org.apache.ignite.resources.*; -import org.gridgain.grid.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.lang.annotation.*; -import java.lang.reflect.*; - -/** - * Default key affinity mapper. If key class has annotation {@link GridCacheAffinityKeyMapped}, - * then the value of annotated method or field will be used to get affinity value instead - * of the key itself. If there is no annotation, then the key is used as is. - * <p> - * Convenience affinity key adapter, {@link GridCacheAffinityKey} can be used in - * conjunction with this mapper to automatically provide custom affinity keys for cache keys. - * <p> - * If non-default affinity mapper is used, is should be provided via - * {@link GridCacheConfiguration#getAffinityMapper()} configuration property. - */ -public class GridCacheDefaultAffinityKeyMapper implements GridCacheAffinityKeyMapper { - /** */ - private static final long serialVersionUID = 0L; - - /** Reflection cache. */ - private GridReflectionCache reflectCache = new GridReflectionCache( - new P1<Field>() { - @Override public boolean apply(Field f) { - // Account for anonymous inner classes. - return f.getAnnotation(GridCacheAffinityKeyMapped.class) != null; - } - }, - new P1<Method>() { - @Override public boolean apply(Method m) { - // Account for anonymous inner classes. - Annotation ann = m.getAnnotation(GridCacheAffinityKeyMapped.class); - - if (ann != null) { - if (!F.isEmpty(m.getParameterTypes())) - throw new IllegalStateException("Method annotated with @GridCacheAffinityKey annotation " + - "cannot have parameters: " + m); - - return true; - } - - return false; - } - } - ); - - /** Logger. */ - @IgniteLoggerResource - private transient IgniteLogger log; - - /** - * If key class has annotation {@link GridCacheAffinityKeyMapped}, - * then the value of annotated method or field will be used to get affinity value instead - * of the key itself. If there is no annotation, then the key is returned as is. - * - * @param key Key to get affinity key for. - * @return Affinity key for given key. - */ - @Override public Object affinityKey(Object key) { - GridArgumentCheck.notNull(key, "key"); - - if (key instanceof PortableObject) { - PortableObject po = (PortableObject)key; - - try { - PortableMetadata meta = po.metaData(); - - if (meta != null) { - String affKeyFieldName = meta.affinityKeyFieldName(); - - if (affKeyFieldName != null) - return po.field(affKeyFieldName); - } - } - catch (PortableException e) { - U.error(log, "Failed to get affinity field from portable object: " + key, e); - } - } - else { - try { - Object o = reflectCache.firstFieldValue(key); - - if (o != null) - return o; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to access affinity field for key [field=" + - reflectCache.firstField(key.getClass()) + ", key=" + key + ']', e); - } - - try { - Object o = reflectCache.firstMethodValue(key); - - if (o != null) - return o; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to invoke affinity method for key [mtd=" + - reflectCache.firstMethod(key.getClass()) + ", key=" + key + ']', e); - } - } - - return key; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java index 8d1fbb2..d55b507 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java @@ -249,7 +249,7 @@ public class GridCacheEntryImpl<K, V> implements GridCacheEntry<K, V>, Externali @Override public int partition() { GridCacheEntryEx<K, V> e = unwrapNoCreate(); - return e == null ? ctx.cache().affinity().partition(key) : e.partition(); + return e == null ? ctx.grid().affinity(ctx.cache().name()).partition(key) : e.partition(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java index 950092e..536de58 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java @@ -19,8 +19,7 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*; import org.gridgain.grid.util.future.*; import org.jetbrains.annotations.*; @@ -38,7 +37,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> protected final IgniteLogger log; /** Affinity. */ - protected final GridCacheAffinityFunction aff; + protected final CacheAffinityFunction aff; /** Start future (always completed by default). */ private final IgniteFuture finFut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index d4b971d..6ff14dd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -26,10 +26,10 @@ import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.spi.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.cache.affinity.consistenthash.*; -import org.gridgain.grid.cache.affinity.fair.*; -import org.gridgain.grid.cache.affinity.rendezvous.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.*; @@ -138,16 +138,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getAffinity() == null) { if (cfg.getCacheMode() == PARTITIONED) { - GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(); + CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(); - aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver()); + aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver()); cfg.setAffinity(aff); } else if (cfg.getCacheMode() == REPLICATED) { - GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(false, 512); + CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(false, 512); - aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver()); + aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver()); cfg.setAffinity(aff); @@ -158,11 +158,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { } else { if (cfg.getCacheMode() == PARTITIONED) { - if (cfg.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) { - GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cfg.getAffinity(); + if (cfg.getAffinity() instanceof CacheConsistentHashAffinityFunction) { + CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cfg.getAffinity(); if (aff.getHashIdResolver() == null) - aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver()); + aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver()); } } } @@ -171,7 +171,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg.setBackups(Integer.MAX_VALUE); if (cfg.getAffinityMapper() == null) - cfg.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper()); + cfg.setAffinityMapper(new CacheDefaultAffinityKeyMapper()); ctx.ggfsHelper().preProcessCacheConfiguration(cfg); @@ -268,20 +268,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ private void validate(IgniteConfiguration c, GridCacheConfiguration cc) throws IgniteCheckedException { if (cc.getCacheMode() == REPLICATED) { - if (cc.getAffinity() instanceof GridCachePartitionFairAffinity) + if (cc.getAffinity() instanceof CachePartitionFairAffinity) throw new IgniteCheckedException("REPLICATED cache can not be started with GridCachePartitionFairAffinity" + " [cacheName=" + cc.getName() + ']'); - if (cc.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) { - GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cc.getAffinity(); + if (cc.getAffinity() instanceof CacheConsistentHashAffinityFunction) { + CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cc.getAffinity(); if (aff.isExcludeNeighbors()) throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " + "GridCacheConsistentHashAffinityFunction cannot be set [cacheName=" + cc.getName() + ']'); } - if (cc.getAffinity() instanceof GridCacheRendezvousAffinityFunction) { - GridCacheRendezvousAffinityFunction aff = (GridCacheRendezvousAffinityFunction)cc.getAffinity(); + if (cc.getAffinity() instanceof CacheRendezvousAffinityFunction) { + CacheRendezvousAffinityFunction aff = (CacheRendezvousAffinityFunction)cc.getAffinity(); if (aff.isExcludeNeighbors()) throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " + @@ -965,10 +965,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : ctx.cache().internalCaches()) { GridCacheConfiguration cfg = cache.configuration(); - if (cfg.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) { - GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cfg.getAffinity(); + if (cfg.getAffinity() instanceof CacheConsistentHashAffinityFunction) { + CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cfg.getAffinity(); - GridCacheAffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver(); + CacheAffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver(); assert hashIdRslvr != null; @@ -1829,12 +1829,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * */ - private static class LocalAffinityFunction implements GridCacheAffinityFunction { + private static class LocalAffinityFunction implements CacheAffinityFunction { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext affCtx) { + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) { ClusterNode locNode = null; for (ClusterNode n : affCtx.currentTopologySnapshot()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index ac13deb..caaa9a9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -22,7 +22,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.processors.cache.affinity.*; @@ -71,7 +70,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali private GridCacheDataStructures dataStructures; /** Affinity. */ - private GridCacheAffinity<K> aff; + private IgniteCacheAffinity<K> aff; /** * Empty constructor required for {@link Externalizable}. @@ -99,7 +98,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries()); dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures()); - aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity()); + + Ignite ignite = ctx.grid(); + aff = new CacheAffinityProxy<>(ctx, ignite.<K>affinity(ctx.cache().name())); } /** @@ -144,11 +145,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public GridCacheAffinity<K> affinity() { - return aff; - } - - /** {@inheritDoc} */ @Override public GridCacheDataStructures dataStructures() { return dataStructures; } @@ -1862,7 +1858,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries()); dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures()); - aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity()); + aff = new CacheAffinityProxy(ctx, ctx.grid().affinity(ctx.cache().name())); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java new file mode 100644 index 0000000..e6112eb --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.portables.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Affinity interface implementation. + */ +public class CacheAffinityImpl<K, V> implements IgniteCacheAffinity<K> { + /** Cache context. */ + private GridCacheContext<K, V> cctx; + + /** Logger. */ + private IgniteLogger log; + + /** + * @param cctx Context. + */ + public CacheAffinityImpl(GridCacheContext<K, V> cctx) { + this.cctx = cctx; + + log = cctx.logger(getClass()); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return cctx.config().getAffinity().partitions(); + } + + /** {@inheritDoc} */ + @Override public int partition(K key) { + A.notNull(key, "key"); + + return cctx.affinity().partition(key); + } + + /** {@inheritDoc} */ + @Override public boolean isPrimary(ClusterNode n, K key) { + A.notNull(n, "n", key, "key"); + + return cctx.affinity().primary(n, key, topologyVersion()); + } + + /** {@inheritDoc} */ + @Override public boolean isBackup(ClusterNode n, K key) { + A.notNull(n, "n", key, "key"); + + return cctx.affinity().backups(key, topologyVersion()).contains(n); + } + + /** {@inheritDoc} */ + @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { + A.notNull(n, "n", key, "key"); + + return cctx.affinity().belongs(n, key, topologyVersion()); + } + + /** {@inheritDoc} */ + @Override public int[] primaryPartitions(ClusterNode n) { + A.notNull(n, "n"); + + long topVer = cctx.discovery().topologyVersion(); + + Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer); + + return U.toIntArray(parts); + } + + /** {@inheritDoc} */ + @Override public int[] backupPartitions(ClusterNode n) { + A.notNull(n, "n"); + + long topVer = cctx.discovery().topologyVersion(); + + Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer); + + return U.toIntArray(parts); + } + + /** {@inheritDoc} */ + @Override public int[] allPartitions(ClusterNode n) { + A.notNull(n, "p"); + + Collection<Integer> parts = new HashSet<>(); + + long topVer = cctx.discovery().topologyVersion(); + + for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { + for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { + if (n.id().equals(affNode.id())) { + parts.add(part); + + break; + } + } + } + + return U.toIntArray(parts); + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapPartitionToNode(int part) { + A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); + + return F.first(cctx.affinity().nodes(part, topologyVersion())); + } + + /** {@inheritDoc} */ + @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { + A.notNull(parts, "parts"); + + Map<Integer, ClusterNode> map = new HashMap<>(); + + if (!F.isEmpty(parts)) { + for (int p : parts) + map.put(p, mapPartitionToNode(p)); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(K key) { + A.notNull(key, "key"); + + if (cctx.portableEnabled()) { + try { + key = (K)cctx.marshalToPortable(key); + } + catch (PortableException e) { + U.error(log, "Failed to marshal key to portable: " + key, e); + } + } + + return cctx.config().getAffinityMapper().affinityKey(key); + } + + /** {@inheritDoc} */ + @Override @Nullable public ClusterNode mapKeyToNode(K key) { + A.notNull(key, "key"); + + return F.first(mapKeysToNodes(F.asList(key)).keySet()); + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { + A.notNull(keys, "keys"); + + long topVer = topologyVersion(); + + int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size(); + + // Must return empty map if no alive nodes present or keys is empty. + Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f); + + for (K key : keys) { + ClusterNode primary = cctx.affinity().primary(key, topVer); + + if (primary != null) { + Collection<K> mapped = res.get(primary); + + if (mapped == null) { + mapped = new ArrayList<>(Math.max(keys.size() / nodesCnt, 16)); + + res.put(primary, mapped); + } + + mapped.add(key); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { + A.notNull(key, "key"); + + return cctx.affinity().nodes(partition(key), topologyVersion()); + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { + A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); + + return cctx.affinity().nodes(part, topologyVersion()); + } + + /** + * Gets current topology version. + * + * @return Topology version. + */ + private long topologyVersion() { + return cctx.affinity().affinityTopologyVersion(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java new file mode 100644 index 0000000..a204aac --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.affinity; + +import org.apache.ignite.IgniteCacheAffinity; +import org.apache.ignite.cluster.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Affinity interface implementation. + */ +public class CacheAffinityProxy<K, V> implements IgniteCacheAffinity<K>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache gateway. */ + private GridCacheGateway<K, V> gate; + + /** Affinity delegate. */ + private IgniteCacheAffinity<K> delegate; + + /** Context. */ + private GridCacheContext<K, V> cctx; + + /** + * Required by {@link Externalizable}. + */ + public CacheAffinityProxy() { + // No-op. + } + + /** + * @param cctx Context. + * @param delegate Delegate object. + */ + public CacheAffinityProxy(GridCacheContext<K, V> cctx, IgniteCacheAffinity<K> delegate) { + gate = cctx.gate(); + this.delegate = delegate; + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.partitions(); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public int partition(K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.partition(key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimary(ClusterNode n, K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.isPrimary(n, key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public boolean isBackup(ClusterNode n, K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.isBackup(n, key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.isPrimaryOrBackup(n, key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public int[] primaryPartitions(ClusterNode n) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.primaryPartitions(n); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public int[] backupPartitions(ClusterNode n) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.backupPartitions(n); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public int[] allPartitions(ClusterNode n) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.allPartitions(n); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapPartitionToNode(int part) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapPartitionToNode(part); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapPartitionsToNodes(parts); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.affinityKey(key); + } + finally { + gate.leave(old); + } + } + + + /** {@inheritDoc} */ + @Override @Nullable public ClusterNode mapKeyToNode(K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapKeyToNode(key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapKeysToNodes(keys); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapKeyToPrimaryAndBackups(key); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { + GridCacheProjectionImpl<K, V> old = gate.enter(null); + + try { + return delegate.mapPartitionToPrimaryAndBackups(part); + } + finally { + gate.leave(old); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cctx); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cctx = (GridCacheContext<K, V>)in.readObject(); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + private Object readResolve() throws ObjectStreamException { + return cctx.grid().affinity(cctx.cache().name()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java deleted file mode 100644 index a820f9c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.affinity; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.portables.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Affinity interface implementation. - */ -public class GridCacheAffinityImpl<K, V> implements GridCacheAffinity<K> { - /** Cache context. */ - private GridCacheContext<K, V> cctx; - - /** Logger. */ - private IgniteLogger log; - - /** - * @param cctx Context. - */ - public GridCacheAffinityImpl(GridCacheContext<K, V> cctx) { - this.cctx = cctx; - - log = cctx.logger(getClass()); - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return cctx.config().getAffinity().partitions(); - } - - /** {@inheritDoc} */ - @Override public int partition(K key) { - A.notNull(key, "key"); - - return cctx.affinity().partition(key); - } - - /** {@inheritDoc} */ - @Override public boolean isPrimary(ClusterNode n, K key) { - A.notNull(n, "n", key, "key"); - - return cctx.affinity().primary(n, key, topologyVersion()); - } - - /** {@inheritDoc} */ - @Override public boolean isBackup(ClusterNode n, K key) { - A.notNull(n, "n", key, "key"); - - return cctx.affinity().backups(key, topologyVersion()).contains(n); - } - - /** {@inheritDoc} */ - @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { - A.notNull(n, "n", key, "key"); - - return cctx.affinity().belongs(n, key, topologyVersion()); - } - - /** {@inheritDoc} */ - @Override public int[] primaryPartitions(ClusterNode n) { - A.notNull(n, "n"); - - long topVer = cctx.discovery().topologyVersion(); - - Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer); - - return U.toIntArray(parts); - } - - /** {@inheritDoc} */ - @Override public int[] backupPartitions(ClusterNode n) { - A.notNull(n, "n"); - - long topVer = cctx.discovery().topologyVersion(); - - Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer); - - return U.toIntArray(parts); - } - - /** {@inheritDoc} */ - @Override public int[] allPartitions(ClusterNode n) { - A.notNull(n, "p"); - - Collection<Integer> parts = new HashSet<>(); - - long topVer = cctx.discovery().topologyVersion(); - - for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { - for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { - if (n.id().equals(affNode.id())) { - parts.add(part); - - break; - } - } - } - - return U.toIntArray(parts); - } - - /** {@inheritDoc} */ - @Override public ClusterNode mapPartitionToNode(int part) { - A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); - - return F.first(cctx.affinity().nodes(part, topologyVersion())); - } - - /** {@inheritDoc} */ - @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { - A.notNull(parts, "parts"); - - Map<Integer, ClusterNode> map = new HashMap<>(); - - if (!F.isEmpty(parts)) { - for (int p : parts) - map.put(p, mapPartitionToNode(p)); - } - - return map; - } - - /** {@inheritDoc} */ - @Override public Object affinityKey(K key) { - A.notNull(key, "key"); - - if (cctx.portableEnabled()) { - try { - key = (K)cctx.marshalToPortable(key); - } - catch (PortableException e) { - U.error(log, "Failed to marshal key to portable: " + key, e); - } - } - - return cctx.config().getAffinityMapper().affinityKey(key); - } - - /** {@inheritDoc} */ - @Override @Nullable public ClusterNode mapKeyToNode(K key) { - A.notNull(key, "key"); - - return F.first(mapKeysToNodes(F.asList(key)).keySet()); - } - - /** {@inheritDoc} */ - @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { - A.notNull(keys, "keys"); - - long topVer = topologyVersion(); - - int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size(); - - // Must return empty map if no alive nodes present or keys is empty. - Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f); - - for (K key : keys) { - ClusterNode primary = cctx.affinity().primary(key, topVer); - - if (primary != null) { - Collection<K> mapped = res.get(primary); - - if (mapped == null) { - mapped = new ArrayList<>(Math.max(keys.size() / nodesCnt, 16)); - - res.put(primary, mapped); - } - - mapped.add(key); - } - } - - return res; - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { - A.notNull(key, "key"); - - return cctx.affinity().nodes(partition(key), topologyVersion()); - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { - A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions"); - - return cctx.affinity().nodes(part, topologyVersion()); - } - - /** - * Gets current topology version. - * - * @return Topology version. - */ - private long topologyVersion() { - return cctx.affinity().affinityTopologyVersion(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java deleted file mode 100644 index f4e61f7..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.affinity; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Affinity interface implementation. - */ -public class GridCacheAffinityProxy<K, V> implements GridCacheAffinity<K>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache gateway. */ - private GridCacheGateway<K, V> gate; - - /** Affinity delegate. */ - private GridCacheAffinity<K> delegate; - - /** Context. */ - private GridCacheContext<K, V> cctx; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheAffinityProxy() { - // No-op. - } - - /** - * @param cctx Context. - * @param delegate Delegate object. - */ - public GridCacheAffinityProxy(GridCacheContext<K, V> cctx, GridCacheAffinity<K> delegate) { - gate = cctx.gate(); - this.delegate = delegate; - this.cctx = cctx; - } - - /** {@inheritDoc} */ - @Override public int partitions() { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.partitions(); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public int partition(K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.partition(key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public boolean isPrimary(ClusterNode n, K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.isPrimary(n, key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public boolean isBackup(ClusterNode n, K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.isBackup(n, key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.isPrimaryOrBackup(n, key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public int[] primaryPartitions(ClusterNode n) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.primaryPartitions(n); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public int[] backupPartitions(ClusterNode n) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.backupPartitions(n); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public int[] allPartitions(ClusterNode n) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.allPartitions(n); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode mapPartitionToNode(int part) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapPartitionToNode(part); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapPartitionsToNodes(parts); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public Object affinityKey(K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.affinityKey(key); - } - finally { - gate.leave(old); - } - } - - - /** {@inheritDoc} */ - @Override @Nullable public ClusterNode mapKeyToNode(K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapKeyToNode(key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapKeysToNodes(keys); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapKeyToPrimaryAndBackups(key); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { - GridCacheProjectionImpl<K, V> old = gate.enter(null); - - try { - return delegate.mapPartitionToPrimaryAndBackups(part); - } - finally { - gate.leave(old); - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(cctx); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cctx = (GridCacheContext<K, V>)in.readObject(); - } - - /** - * Reconstructs object on unmarshalling. - * - * @return Reconstructed object. - * @throws ObjectStreamException Thrown in case of unmarshalling error. - */ - private Object readResolve() throws ObjectStreamException { - return cctx.grid().cache(cctx.cache().name()).affinity(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java index e51a9fc..0b18de2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java @@ -17,7 +17,7 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; -import org.gridgain.grid.cache.affinity.GridCacheAffinityKeyMapped; +import org.apache.ignite.cache.affinity.CacheAffinityKeyMapped; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -52,7 +52,7 @@ public class GridCacheInternalKeyImpl implements GridCacheInternalKey, Externali } /** {@inheritDoc} */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped @Override public String name() { return name; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java index b8d603b..aaa11fe 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java @@ -19,7 +19,7 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; @@ -636,7 +636,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * @return Item affinity key. */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped public Object affinityKey() { return queueName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java index cacc933..4f2dc55 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -556,7 +556,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa /** * @return Item affinity key. */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped public Object affinityKey() { return setName; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 04c86a9..83159f0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.managers.discovery.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -312,10 +312,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. */ private boolean canCalculateAffinity(GridCacheContext<K, V> cacheCtx) { - GridCacheAffinityFunction affFunc = cacheCtx.config().getAffinity(); + CacheAffinityFunction affFunc = cacheCtx.config().getAffinity(); // Do not request affinity from remote nodes if affinity function is not centralized. - if (!U.hasAnnotation(affFunc, GridCacheCentralizedAffinityFunction.class)) + if (!U.hasAnnotation(affFunc, CacheCentralizedAffinityFunction.class)) return true; // If local node did not initiate exchange or local node is the only cache node in grid. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index 7184c7b..d449396 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -597,7 +597,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma info.unmarshalValue(cctx, cctx.deploy().globalLoader()); // Entries available locally in DHT should not be loaded into near cache for reading. - if (!cctx.cache().affinity().isPrimaryOrBackup(cctx.localNode(), info.key())) { + if (!cctx.grid().affinity(ctx.cache().cache().name()).isPrimaryOrBackup(cctx.localNode(), info.key())) { GridNearCacheEntry<K, V> entry = cache().entryExx(info.key(), topVer); GridCacheVersion saved = savedVers.get(info.key()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java index 9817fd6..f9ea82f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java @@ -24,7 +24,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.*; import org.gridgain.grid.util.*; @@ -1050,7 +1050,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private final String cn = cacheName; /** */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped private final Object ak = affKey; @Override public Object execute() { @@ -1074,7 +1074,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private final String cn = cacheName; /** */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped private final Object ak = affKey; @Override public Object execute() { @@ -1142,7 +1142,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private final String cn = cacheName; /** */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped private final Object ak = affKey; @Nullable @Override public Object execute() { @@ -1163,7 +1163,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private final String cn = cacheName; /** */ - @GridCacheAffinityKeyMapped + @CacheAffinityKeyMapped private final Object ak = affKey; @Nullable @Override public Object execute() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java index 03e1c80..79ab954 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -22,7 +22,6 @@ import org.apache.ignite.dataload.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; @@ -243,7 +242,7 @@ public class GridDataLoadCacheUpdaters { Map<Integer, Collection<K>> rmvPartMap = null; Map<Integer, Map<K, V>> putPartMap = null; - GridCacheAffinity<K> aff = cache.ignite().<K, V>cache(cache.getName()).affinity(); + IgniteCacheAffinity<K> aff = cache.ignite().affinity(cache.getName()); for (Map.Entry<K, V> entry : entries) { K key = entry.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java index 4408966..8fe4602 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java @@ -26,7 +26,7 @@ import org.apache.ignite.thread.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.managers.eventstorage.*; @@ -71,6 +71,9 @@ public class GridGgfsDataManager extends GridGgfsManager { /** Data cache. */ private GridCache<Object, Object> dataCache; + /** Affinity */ + private IgniteCacheAffinity<Object> cacheAff; + /** */ private IgniteFuture<?> dataCacheStartFut; @@ -153,6 +156,8 @@ public class GridGgfsDataManager extends GridGgfsManager { dataCachePrj = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName()); dataCache = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName()); + cacheAff = ggfsCtx.kernalContext().grid().affinity(dataCache.name()); + dataCacheStartFut = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName()) .preloader().startFuture(); @@ -164,7 +169,7 @@ public class GridGgfsDataManager extends GridGgfsManager { assert dataCachePrj != null; - GridCacheAffinityKeyMapper mapper = ggfsCtx.kernalContext().cache() + CacheAffinityKeyMapper mapper = ggfsCtx.kernalContext().cache() .internalCache(ggfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper(); grpSize = mapper instanceof IgniteFsGroupDataBlocksKeyMapper ? @@ -275,13 +280,13 @@ public class GridGgfsDataManager extends GridGgfsManager { UUID nodeId = ggfsCtx.kernalContext().localNodeId(); - if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal()) + if (prevAffKey != null && cacheAff.mapKeyToNode(prevAffKey).isLocal()) return prevAffKey; while (true) { IgniteUuid key = new IgniteUuid(nodeId, affKeyGen.getAndIncrement()); - if (dataCache.affinity().mapKeyToNode(key).isLocal()) + if (cacheAff.mapKeyToNode(key).isLocal()) return key; } } @@ -293,7 +298,7 @@ public class GridGgfsDataManager extends GridGgfsManager { * @return Primary node for this key. */ public ClusterNode affinityNode(Object affinityKey) { - return dataCache.affinity().mapKeyToNode(affinityKey); + return cacheAff.mapKeyToNode(affinityKey); } /** @@ -349,7 +354,7 @@ public class GridGgfsDataManager extends GridGgfsManager { GridGgfsBlockKey key = new GridGgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), i); - Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key); + Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups(key); assert affNodes != null && !affNodes.isEmpty(); @@ -782,8 +787,8 @@ public class GridGgfsDataManager extends GridGgfsManager { if (info.affinityKey() != null) { Collection<IgniteFsBlockLocation> res = new LinkedList<>(); - splitBlocks(start, len, maxLen, dataCache.affinity().mapKeyToPrimaryAndBackups( - new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res); + splitBlocks(start, len, maxLen, cacheAff.mapKeyToPrimaryAndBackups( + new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res); return res; } @@ -818,8 +823,8 @@ public class GridGgfsDataManager extends GridGgfsManager { if (range.belongs(pos)) { long partEnd = Math.min(range.endOffset() + 1, end); - Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups( - range.affinityKey()); + Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups( + range.affinityKey()); if (log.isDebugEnabled()) log.debug("Calculated affinity for range [start=" + pos + ", end=" + partEnd + @@ -901,7 +906,7 @@ public class GridGgfsDataManager extends GridGgfsManager { GridGgfsBlockKey key = new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), grpIdx * grpSize); - Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key); + Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups(key); if (log.isDebugEnabled()) log.debug("Mapped key to nodes [key=" + key + ", nodes=" + F.nodeIds(affNodes) + @@ -1453,7 +1458,8 @@ public class GridGgfsDataManager extends GridGgfsManager { // Will update range if necessary. GridGgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange); - ClusterNode primaryNode = dataCachePrj.cache().affinity().mapKeyToNode(key); + Ignite ignite = dataCachePrj.gridProjection().ignite(); + ClusterNode primaryNode = ignite.affinity(dataCachePrj.name()).mapKeyToNode(key); if (block == first) { off = (int)blockStartOff;