Repository: incubator-ignite Updated Branches: refs/heads/sprint-1 36a4feb5e -> 4db446248
ignite-212 - p2p Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3e0695cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3e0695cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3e0695cb Branch: refs/heads/sprint-1 Commit: 3e0695cb26935972cebf3dd2678cf6669eb34b8f Parents: 70c50f5 Author: S.Vladykin <svlady...@gridgain.com> Authored: Sun Feb 15 22:34:25 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Sun Feb 15 22:34:25 2015 +0300 ---------------------------------------------------------------------- .../processors/query/h2/IgniteH2Indexing.java | 94 +++++++++++++++++--- 1 file changed, 84 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e0695cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index f6a8e7c..7276121 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -62,6 +62,7 @@ import java.sql.*; import java.text.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*; @@ -1142,7 +1143,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } if (cfg.isUseOptimizedSerializer()) - Utils.serializer = h2Serializer(); + Utils.serializer = h2Serializer(ctx != null && ctx.config().isPeerClassLoadingEnabled()); long maxOffHeapMemory = cfg.getMaxOffHeapMemory(); @@ -1198,18 +1199,91 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param p2pEnabled If peer-deployment is enabled. * @return Serializer. */ - protected JavaObjectSerializer h2Serializer() { - return new JavaObjectSerializer() { - @Override public byte[] serialize(Object obj) throws Exception { - return marshaller.marshal(obj); - } + protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) { + return p2pEnabled ? + new JavaObjectSerializer() { + /** */ + private volatile Map<ClassLoader, Byte> ldr2id = Collections.emptyMap(); - @Override public Object deserialize(byte[] bytes) throws Exception { - return marshaller.unmarshal(bytes, null); - } - }; + /** */ + private volatile Map<Byte, ClassLoader> id2ldr = Collections.emptyMap(); + + /** */ + private byte ldrIdGen = Byte.MIN_VALUE; + + /** */ + private final Lock lock = new ReentrantLock(); + + @Override public byte[] serialize(Object obj) throws Exception { + ClassLoader ldr = obj.getClass().getClassLoader(); + + Byte ldrId = ldr2id.get(ldr); + + if (ldrId == null) { + lock.lock(); + + try { + ldrId = ldr2id.get(ldr); + + if (ldrId == null) { + ldrId = ldrIdGen++; + + if (id2ldr.containsKey(ldrId)) // Overflow. + throw new IgniteException("Failed to add new peer-to-peer class loader."); + + Map<Byte, ClassLoader> id2ldr0 = new HashMap<>(id2ldr); + Map<ClassLoader, Byte> ldr2id0 = new IdentityHashMap<>(ldr2id); + + id2ldr0.put(ldrId, ldr); + ldr2id0.put(ldr, ldrId); + + ldr2id = ldr2id0; + id2ldr = id2ldr0; + } + } + finally { + lock.unlock(); + } + } + + byte[] bytes = marshaller.marshal(obj); + + int len = bytes.length; + + bytes = Arrays.copyOf(bytes, len + 1); // The last byte is for ldrId. + + bytes[len] = ldrId; + + return bytes; + } + + @Override public Object deserialize(byte[] bytes) throws Exception { + int last = bytes.length - 1; + + byte ldrId = bytes[last]; + + ClassLoader ldr = id2ldr.get(ldrId); + + if (ldr == null) + throw new IllegalStateException("Class loader was not found: " + ldrId); + + bytes = Arrays.copyOf(bytes, last); // Trim the last byte. + + return marshaller.unmarshal(bytes, ldr); + } + } : + new JavaObjectSerializer() { + @Override public byte[] serialize(Object obj) throws Exception { + return marshaller.marshal(obj); + } + + @Override public Object deserialize(byte[] bytes) throws Exception { + return marshaller.unmarshal(bytes, null); + } + }; } /**