#ignite-180: Add processor.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e4492869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e4492869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e4492869

Branch: refs/heads/ignite-180-1
Commit: e449286923890085b02dfc9b3db44d87fb081585
Parents: 063fee4
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue Apr 7 12:08:20 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue Apr 7 12:08:20 2015 +0300

----------------------------------------------------------------------
 .../configuration/CollectionConfiguration.java  |  24 ++-
 .../internal/processors/cache/CacheType.java    |   5 -
 .../processors/cache/GridCacheProcessor.java    |  17 --
 .../datastructures/DataStructuresProcessor.java | 190 ++++++++++++++-----
 4 files changed, 161 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java
index a5a286f..40ab40d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CollectionConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.ignite.configuration;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
+
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMemoryMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
@@ -27,7 +29,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 /**
  * Configuration for Ignite collections.
  */
-public class CollectionConfiguration {
+public class CollectionConfiguration implements Externalizable {
     /** Cache atomicity mode. */
     private CacheAtomicityMode atomicityMode = ATOMIC;
 
@@ -136,4 +138,24 @@ public class CollectionConfiguration {
     @Override public String toString() {
         return S.toString(CollectionConfiguration.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(atomicityMode.ordinal());
+        out.writeInt(cacheMode.ordinal());
+        out.writeObject(memoryMode);
+        out.writeInt(backups);
+        out.writeLong(offHeapMaxMem);
+        out.writeBoolean(collocated);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        atomicityMode = CacheAtomicityMode.fromOrdinal(in.readInt());
+        cacheMode = CacheMode.fromOrdinal(in.readInt());
+        memoryMode = (CacheMemoryMode) in.readObject();
+        backups = in.readInt();
+        offHeapMaxMem = in.readLong();
+        collocated = in.readBoolean();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
index d1cbfcd..e0747b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
@@ -36,11 +36,6 @@ public enum CacheType {
     INTERNAL(false, SYSTEM_POOL),
 
     /**
-     * Internal datastructure's cache, should use separate thread pool.
-     */
-    DATASTRUCTURE(false, UTILITY_CACHE_POOL),
-
-    /**
      * Internal replicated cache, should use separate thread pool.
      */
     UTILITY(false, UTILITY_CACHE_POOL),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 073dce2..c8d2ded 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1291,23 +1291,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Gets a collection of currentlty started caches.
-     *
-     * @return Collection of started cache names.
-     */
-    public Collection<CacheConfiguration> dataStructuresCacheNames() {
-        Collection<CacheConfiguration> res = new HashSet<>();
-        for (String name : registeredCaches.keySet()) {
-            DynamicCacheDescriptor desc = registeredCaches.get(name);
-
-            if (desc.cacheType() == CacheType.DATASTRUCTURE)
-                res.add(desc.cacheConfiguration());
-        }
-
-        return res;
-    }
-
-    /**
      * Gets cache mode.
      *
      * @param cacheName Cache name to check.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4492869/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b8ace76..ed18785 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -95,7 +95,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, 
Map<String, DataStructureInfo>> utilityCache;
 
     /** */
-    private GridCacheProjectionEx<CacheDataStructuresCacheKey, 
List<CacheConfiguration>> utilityDataCache;
+    private GridCacheProjectionEx<CacheDataStructuresCacheKey, 
List<CacheCollectionInfo>> utilityDataCache;
 
     /**
      * @param ctx Context.
@@ -713,15 +713,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param cfg Collection configuration.
-     * @param i Index.
-     * @return Cache name.
-     */
-    private String getCacheName(CollectionConfiguration cfg, int i) {
-        return "data_structures_" + i;
-    }
-
-    /**
-     * @param cfg Collection configuration.
      * @param name Cache name.
      * @return Cache configuration.
      */
@@ -742,55 +733,22 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
      * @param cfg Collection configuration.
      * @return Cache name.
      */
-    private String findCompatibleConfiguration(CollectionConfiguration cfg) 
throws IgniteCheckedException {
-        List<CacheConfiguration> cfgs = 
utilityDataCache.get(DATA_STRUCTURES_CACHE_KEY);
-
-        if (cfgs == null)
-            return null;
-
-        for (CacheConfiguration ccfg : cfgs) {
-            if (ccfg.getAtomicityMode() == cfg.atomicityMode() &&
-                    ccfg.getMemoryMode() == cfg.memoryMode() &&
-                    ccfg.getCacheMode() == cfg.cacheMode() &&
-                    ccfg.getBackups() == cfg.backups() &&
-                    ccfg.getOffHeapMaxMemory() == cfg.offHeapMaxMem())
-                return ccfg.getName();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param cfg Collection configuration.
-     * @return Cache name.
-     */
     private String compatibleConfiguration(CollectionConfiguration cfg) throws 
IgniteCheckedException {
-        String cacheName = findCompatibleConfiguration(cfg);
-
-        if (cacheName == null) {
-            try (IgniteInternalTx tx = utilityDataCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
-                cacheName = findCompatibleConfiguration(cfg);
+        T2<String, IgniteCheckedException> res =
+            utilityDataCache.invoke(DATA_STRUCTURES_CACHE_KEY, new 
AddDataCacheProcessor(cfg)).get();
 
-                if (cacheName == null) {
-                    List<CacheConfiguration> oldVal = 
utilityDataCache.get(DATA_STRUCTURES_CACHE_KEY);
+        IgniteCheckedException err = res.get2();
 
-                    cacheName = getCacheName(cfg, oldVal != null ? 
oldVal.size() : 0);
-
-                    CacheConfiguration newCfg = cacheConfiguration(cfg, 
cacheName);
-
-                    ctx.cache().dynamicStartCache(newCfg, cacheName, null, 
CacheType.DATASTRUCTURE, false).get();
+        if (err != null)
+            throw err;
 
-                    List<CacheConfiguration> newVal = oldVal != null ? new 
ArrayList(oldVal) :
-                            new ArrayList<CacheConfiguration>();
+        String cacheName = res.get1();
 
-                    newVal.add(newCfg);
+        assert cacheName != null;
 
-                    utilityDataCache.put(DATA_STRUCTURES_CACHE_KEY, newVal);
-                }
+        CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName);
 
-                tx.commit();
-            }
-        }
+        ctx.cache().dynamicStartCache(newCfg, cacheName, null, 
CacheType.INTERNAL, false).get();
 
         return cacheName;
     }
@@ -1394,6 +1352,54 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     /**
      *
      */
+    static class CacheCollectionInfo implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private String cacheName;
+
+        /** */
+        private CollectionConfiguration cfg;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public CacheCollectionInfo() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Collection cache name.
+         * @param cfg CollectionConfiguration.
+         */
+        public CacheCollectionInfo(String cacheName, CollectionConfiguration 
cfg) {
+            this.cacheName = cacheName;
+            this.cfg = cfg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            cfg = new CollectionConfiguration();
+            cfg.readExternal(in);
+            cacheName = U.readString(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            cfg.writeExternal(out);
+            U.writeString(out, cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheCollectionInfo.class, this);
+        }
+    }
+
+    /**
+     *
+     */
     static class QueueInfo extends CollectionInfo {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1695,6 +1701,86 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     /**
      *
      */
+    static class AddDataCacheProcessor implements
+        EntryProcessor<CacheDataStructuresCacheKey, List<CacheCollectionInfo>,
+            T2<String, IgniteCheckedException>>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private CollectionConfiguration cfg;
+
+        /**
+         * @param cfg Data structure information.
+         */
+        AddDataCacheProcessor(CollectionConfiguration cfg) {
+            this.cfg = cfg;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public AddDataCacheProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<String, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresCacheKey, 
List<CacheCollectionInfo>> entry,
+            Object... args)
+        {
+            List<CacheCollectionInfo> list = entry.getValue();
+
+            if (list == null) {
+                list = new ArrayList<>();
+
+                list.add(new CacheCollectionInfo("datastructeres_0", cfg));
+
+                entry.setValue(list);
+
+                return new T2<>("datastructeres_0", null);
+            }
+
+            for (CacheCollectionInfo col : list) {
+                if (col.cfg.atomicityMode() == cfg.atomicityMode() &&
+                    col.cfg.memoryMode() == cfg.memoryMode() &&
+                    col.cfg.cacheMode() == cfg.cacheMode() &&
+                    col.cfg.backups() == cfg.backups() &&
+                    col.cfg.offHeapMaxMem() == cfg.offHeapMaxMem())
+
+                    return new T2<>(col.cacheName, null);
+            }
+
+            String newName = "datastructeres_" + list.size();
+
+            List<CacheCollectionInfo> newList = new ArrayList<>(list);
+
+            newList.add(new CacheCollectionInfo(newName, cfg));
+
+            return new T2<>(newName, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            cfg.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            cfg = new CollectionConfiguration();
+
+            cfg.readExternal(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AddDataCacheProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
     static class RemoveDataStructureProcessor implements
         EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>,
             T2<Boolean, IgniteCheckedException>>, Externalizable {

Reply via email to