#ignite-180: Add processor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e4492869 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e4492869 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e4492869 Branch: refs/heads/ignite-180-1 Commit: e449286923890085b02dfc9b3db44d87fb081585 Parents: 063fee4 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Apr 7 12:08:20 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Apr 7 12:08:20 2015 +0300 ---------------------------------------------------------------------- .../configuration/CollectionConfiguration.java | 24 ++- .../internal/processors/cache/CacheType.java | 5 - .../processors/cache/GridCacheProcessor.java | 17 -- .../datastructures/DataStructuresProcessor.java | 190 ++++++++++++++----- 4 files changed, 161 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java index a5a286f..40ab40d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java @@ -20,6 +20,8 @@ package org.apache.ignite.configuration; import org.apache.ignite.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; + import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMemoryMode.*; import static org.apache.ignite.cache.CacheMode.*; @@ -27,7 +29,7 @@ import static org.apache.ignite.cache.CacheMode.*; /** * Configuration for Ignite collections. */ -public class CollectionConfiguration { +public class CollectionConfiguration implements Externalizable { /** Cache atomicity mode. */ private CacheAtomicityMode atomicityMode = ATOMIC; @@ -136,4 +138,24 @@ public class CollectionConfiguration { @Override public String toString() { return S.toString(CollectionConfiguration.class, this); } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(atomicityMode.ordinal()); + out.writeInt(cacheMode.ordinal()); + out.writeObject(memoryMode); + out.writeInt(backups); + out.writeLong(offHeapMaxMem); + out.writeBoolean(collocated); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + atomicityMode = CacheAtomicityMode.fromOrdinal(in.readInt()); + cacheMode = CacheMode.fromOrdinal(in.readInt()); + memoryMode = (CacheMemoryMode) in.readObject(); + backups = in.readInt(); + offHeapMaxMem = in.readLong(); + collocated = in.readBoolean(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java index d1cbfcd..e0747b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java @@ -36,11 +36,6 @@ public enum CacheType { INTERNAL(false, SYSTEM_POOL), /** - * Internal datastructure's cache, should use separate thread pool. - */ - DATASTRUCTURE(false, UTILITY_CACHE_POOL), - - /** * Internal replicated cache, should use separate thread pool. */ UTILITY(false, UTILITY_CACHE_POOL), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 073dce2..c8d2ded 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1291,23 +1291,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Gets a collection of currentlty started caches. - * - * @return Collection of started cache names. - */ - public Collection<CacheConfiguration> dataStructuresCacheNames() { - Collection<CacheConfiguration> res = new HashSet<>(); - for (String name : registeredCaches.keySet()) { - DynamicCacheDescriptor desc = registeredCaches.get(name); - - if (desc.cacheType() == CacheType.DATASTRUCTURE) - res.add(desc.cacheConfiguration()); - } - - return res; - } - - /** * Gets cache mode. * * @param cacheName Cache name to check. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index b8ace76..ed18785 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -95,7 +95,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache; /** */ - private GridCacheProjectionEx<CacheDataStructuresCacheKey, List<CacheConfiguration>> utilityDataCache; + private GridCacheProjectionEx<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache; /** * @param ctx Context. @@ -713,15 +713,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * @param cfg Collection configuration. - * @param i Index. - * @return Cache name. - */ - private String getCacheName(CollectionConfiguration cfg, int i) { - return "data_structures_" + i; - } - - /** - * @param cfg Collection configuration. * @param name Cache name. * @return Cache configuration. */ @@ -742,55 +733,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param cfg Collection configuration. * @return Cache name. */ - private String findCompatibleConfiguration(CollectionConfiguration cfg) throws IgniteCheckedException { - List<CacheConfiguration> cfgs = utilityDataCache.get(DATA_STRUCTURES_CACHE_KEY); - - if (cfgs == null) - return null; - - for (CacheConfiguration ccfg : cfgs) { - if (ccfg.getAtomicityMode() == cfg.atomicityMode() && - ccfg.getMemoryMode() == cfg.memoryMode() && - ccfg.getCacheMode() == cfg.cacheMode() && - ccfg.getBackups() == cfg.backups() && - ccfg.getOffHeapMaxMemory() == cfg.offHeapMaxMem()) - return ccfg.getName(); - } - - return null; - } - - /** - * @param cfg Collection configuration. - * @return Cache name. - */ private String compatibleConfiguration(CollectionConfiguration cfg) throws IgniteCheckedException { - String cacheName = findCompatibleConfiguration(cfg); - - if (cacheName == null) { - try (IgniteInternalTx tx = utilityDataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - cacheName = findCompatibleConfiguration(cfg); + T2<String, IgniteCheckedException> res = + utilityDataCache.invoke(DATA_STRUCTURES_CACHE_KEY, new AddDataCacheProcessor(cfg)).get(); - if (cacheName == null) { - List<CacheConfiguration> oldVal = utilityDataCache.get(DATA_STRUCTURES_CACHE_KEY); + IgniteCheckedException err = res.get2(); - cacheName = getCacheName(cfg, oldVal != null ? oldVal.size() : 0); - - CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName); - - ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.DATASTRUCTURE, false).get(); + if (err != null) + throw err; - List<CacheConfiguration> newVal = oldVal != null ? new ArrayList(oldVal) : - new ArrayList<CacheConfiguration>(); + String cacheName = res.get1(); - newVal.add(newCfg); + assert cacheName != null; - utilityDataCache.put(DATA_STRUCTURES_CACHE_KEY, newVal); - } + CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName); - tx.commit(); - } - } + ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false).get(); return cacheName; } @@ -1394,6 +1352,54 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * */ + static class CacheCollectionInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String cacheName; + + /** */ + private CollectionConfiguration cfg; + + /** + * Required by {@link Externalizable}. + */ + public CacheCollectionInfo() { + // No-op. + } + + /** + * @param cacheName Collection cache name. + * @param cfg CollectionConfiguration. + */ + public CacheCollectionInfo(String cacheName, CollectionConfiguration cfg) { + this.cacheName = cacheName; + this.cfg = cfg; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cfg = new CollectionConfiguration(); + cfg.readExternal(in); + cacheName = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + cfg.writeExternal(out); + U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheCollectionInfo.class, this); + } + } + + /** + * + */ static class QueueInfo extends CollectionInfo { /** */ private static final long serialVersionUID = 0L; @@ -1695,6 +1701,86 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * */ + static class AddDataCacheProcessor implements + EntryProcessor<CacheDataStructuresCacheKey, List<CacheCollectionInfo>, + T2<String, IgniteCheckedException>>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private CollectionConfiguration cfg; + + /** + * @param cfg Data structure information. + */ + AddDataCacheProcessor(CollectionConfiguration cfg) { + this.cfg = cfg; + } + + /** + * Required by {@link Externalizable}. + */ + public AddDataCacheProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public T2<String, IgniteCheckedException> process( + MutableEntry<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> entry, + Object... args) + { + List<CacheCollectionInfo> list = entry.getValue(); + + if (list == null) { + list = new ArrayList<>(); + + list.add(new CacheCollectionInfo("datastructeres_0", cfg)); + + entry.setValue(list); + + return new T2<>("datastructeres_0", null); + } + + for (CacheCollectionInfo col : list) { + if (col.cfg.atomicityMode() == cfg.atomicityMode() && + col.cfg.memoryMode() == cfg.memoryMode() && + col.cfg.cacheMode() == cfg.cacheMode() && + col.cfg.backups() == cfg.backups() && + col.cfg.offHeapMaxMem() == cfg.offHeapMaxMem()) + + return new T2<>(col.cacheName, null); + } + + String newName = "datastructeres_" + list.size(); + + List<CacheCollectionInfo> newList = new ArrayList<>(list); + + newList.add(new CacheCollectionInfo(newName, cfg)); + + return new T2<>(newName, null); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + cfg.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cfg = new CollectionConfiguration(); + + cfg.readExternal(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AddDataCacheProcessor.class, this); + } + } + + /** + * + */ static class RemoveDataStructureProcessor implements EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, T2<Boolean, IgniteCheckedException>>, Externalizable {