GG-9900 - Portable marshaller refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15b4ebb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15b4ebb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15b4ebb1

Branch: refs/heads/ignite-471
Commit: 15b4ebb19875cba90bfcebe8a2f62fbe8c3f8213
Parents: d838a9d
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Mar 11 18:30:04 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Mar 11 18:30:04 2015 -0700

----------------------------------------------------------------------
 .../GridClientOptimizedMarshaller.java          |   7 +-
 .../optimized/OptimizedMarshaller.java          |  45 ++--
 .../OptimizedObjectStreamRegistry.java          | 224 -------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 .../OptimizedObjectStreamSelfTest.java          |  47 +---
 5 files changed, 27 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index 00f96b8..ff77177 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@ -50,17 +50,14 @@ public class GridClientOptimizedMarshaller implements 
GridClientMarshaller {
      * Constructs optimized marshaller with specific parameters.
      *
      * @param requireSer Require serializable flag.
-     * @param poolSize Object streams pool size.
      * @throws IOException If an I/O error occurs while writing stream header.
      * @throws IgniteException If this marshaller is not supported on the 
current JVM.
      * @see OptimizedMarshaller
      */
-    public GridClientOptimizedMarshaller(boolean requireSer, int poolSize) 
throws IOException {
-        opMarsh = new OptimizedMarshaller();
+    public GridClientOptimizedMarshaller(boolean requireSer) throws 
IOException {
+        opMarsh = new OptimizedMarshaller(requireSer);
 
         opMarsh.setContext(new ClientMarshallerContext());
-        opMarsh.setRequireSerializable(requireSer);
-        opMarsh.setPoolSize(poolSize);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
 
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
index bb63fdd..8eaf23b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
@@ -19,6 +19,8 @@ package org.apache.ignite.marshaller.optimized;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
 import org.jetbrains.annotations.*;
 import sun.misc.*;
@@ -74,6 +76,9 @@ import java.nio.*;
  * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
  */
 public class OptimizedMarshaller extends AbstractMarshaller {
+    /** Initial buffer size. */
+    private static final int INIT_BUF_SIZE = 4 * 1024;
+
     /** Default class loader. */
     private final ClassLoader dfltClsLdr = getClass().getClassLoader();
 
@@ -101,6 +106,8 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
      * @param requireSer Whether to require {@link Serializable}.
      */
     public OptimizedMarshaller(boolean requireSer) {
+        this();
+
         this.requireSer = requireSer;
     }
 
@@ -122,25 +129,6 @@ public class OptimizedMarshaller extends 
AbstractMarshaller {
         this.mapper = mapper;
     }
 
-    /**
-     * Specifies size of cached object streams used by marshaller. Object 
streams are cached for
-     * performance reason to avoid costly recreation for every serialization 
routine. If {@code 0} (default),
-     * pool is not used and each thread has its own cached object stream which 
it keeps reusing.
-     * <p>
-     * Since each stream has an internal buffer, creating a stream for each 
thread can lead to
-     * high memory consumption if many large messages are marshalled or 
unmarshalled concurrently.
-     * Consider using pool in this case. This will limit number of streams 
that can be created and,
-     * therefore, decrease memory consumption.
-     * <p>
-     * NOTE: Using streams pool can decrease performance since streams will be 
shared between
-     * different threads which will lead to more frequent context switching.
-     *
-     * @param poolSize Streams pool size. If {@code 0}, pool is not used.
-     */
-    public void setPoolSize(int poolSize) {
-        OptimizedObjectStreamRegistry.poolSize(poolSize);
-    }
-
     /** {@inheritDoc} */
     @Override public void marshal(@Nullable Object obj, OutputStream out) 
throws IgniteCheckedException {
         assert out != null;
@@ -148,7 +136,8 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
         OptimizedObjectOutputStream objOut = null;
 
         try {
-            objOut = OptimizedObjectStreamRegistry.out();
+            // TODO: IGNITE-471 - Need adaptive initial size.
+            objOut = new OptimizedObjectOutputStream(new 
GridUnsafeDataOutput(INIT_BUF_SIZE));
 
             objOut.context(ctx, mapper, requireSer);
 
@@ -160,7 +149,7 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
             throw new IgniteCheckedException("Failed to serialize object: " + 
obj, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(objOut);
+            U.closeQuiet(objOut);
         }
     }
 
@@ -169,7 +158,8 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
         OptimizedObjectOutputStream objOut = null;
 
         try {
-            objOut = OptimizedObjectStreamRegistry.out();
+            // TODO: IGNITE-471 - Need adaptive initial size.
+            objOut = new OptimizedObjectOutputStream(new 
GridUnsafeDataOutput(INIT_BUF_SIZE));
 
             objOut.context(ctx, mapper, requireSer);
 
@@ -181,7 +171,7 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
             throw new IgniteCheckedException("Failed to serialize object: " + 
obj, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(objOut);
+            U.closeQuiet(objOut);
         }
     }
 
@@ -193,7 +183,7 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
         OptimizedObjectInputStream objIn = null;
 
         try {
-            objIn = OptimizedObjectStreamRegistry.in();
+            objIn = new OptimizedObjectInputStream(new GridUnsafeDataInput());
 
             objIn.context(ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
 
@@ -210,7 +200,7 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
                 clsLdr, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(objIn);
+            U.closeQuiet(objIn);
         }
     }
 
@@ -223,10 +213,11 @@ public class OptimizedMarshaller extends 
AbstractMarshaller {
         OptimizedObjectInputStream objIn = null;
 
         try {
-            objIn = OptimizedObjectStreamRegistry.in();
+            objIn = new OptimizedObjectInputStream(new GridUnsafeDataInput());
 
             objIn.context(ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
 
+            // TODO: IGNITE-471 - Position is not moved.
             objIn.in().bytes(buf.array(), buf.position(), buf.remaining());
 
             return (T)objIn.readObject();
@@ -240,7 +231,7 @@ public class OptimizedMarshaller extends AbstractMarshaller 
{
                 clsLdr, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(objIn);
+            U.closeQuiet(objIn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
deleted file mode 100644
index ae28015..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.marshaller.optimized;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.concurrent.*;
-
-/**
- * Storage for object streams.
- */
-class OptimizedObjectStreamRegistry {
-    /** Holders. */
-    private static final ThreadLocal<StreamHolder> holders = new 
ThreadLocal<>();
-
-    /** Holders pool. */
-    private static BlockingQueue<StreamHolder> pool;
-
-    /**
-     * Ensures singleton.
-     */
-    private OptimizedObjectStreamRegistry() {
-        // No-op.
-    }
-
-    /**
-     * Sets streams pool size.
-     *
-     * @param size Streams pool size.
-     */
-    static void poolSize(int size) {
-        if (size > 0) {
-            pool = new LinkedBlockingQueue<>(size);
-
-            for (int i = 0; i < size; i++) {
-                boolean b = pool.offer(new StreamHolder());
-
-                assert b;
-            }
-        }
-        else
-            pool = null;
-    }
-
-    /**
-     * Gets output stream.
-     *
-     * @return Object output stream.
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If 
thread is interrupted while trying to take holder from pool.
-     */
-    static OptimizedObjectOutputStream out() throws 
IgniteInterruptedCheckedException {
-        return holder().acquireOut();
-    }
-
-    /**
-     * Gets input stream.
-     *
-     * @return Object input stream.
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If 
thread is interrupted while trying to take holder from pool.
-     */
-    static OptimizedObjectInputStream in() throws 
IgniteInterruptedCheckedException {
-        return holder().acquireIn();
-    }
-
-    /**
-     * Closes and releases output stream.
-     *
-     * @param out Object output stream.
-     */
-    static void closeOut(OptimizedObjectOutputStream out) {
-        U.close(out, null);
-
-        StreamHolder holder = holders.get();
-
-        holder.releaseOut();
-
-        if (pool != null) {
-            holders.remove();
-
-            boolean b = pool.offer(holder);
-
-            assert b;
-        }
-    }
-
-    /**
-     * Closes and releases input stream.
-     *
-     * @param in Object input stream.
-     */
-    @SuppressWarnings("TypeMayBeWeakened")
-    static void closeIn(OptimizedObjectInputStream in) {
-        U.close(in, null);
-
-        StreamHolder holder = holders.get();
-
-        holder.releaseIn();
-
-        if (pool != null) {
-            holders.remove();
-
-            boolean b = pool.offer(holder);
-
-            assert b;
-        }
-    }
-
-    /**
-     * Gets holder from pool or thread local.
-     *
-     * @return Stream holder.
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If 
thread is interrupted while trying to take holder from pool.
-     */
-    private static StreamHolder holder() throws 
IgniteInterruptedCheckedException {
-        StreamHolder holder = holders.get();
-
-        if (holder == null) {
-            try {
-                holders.set(holder = pool != null ? pool.take() : new 
StreamHolder());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException("Failed to take 
object stream from pool (thread interrupted).", e);
-            }
-        }
-
-        return holder;
-    }
-
-    /**
-     * Streams holder.
-     */
-    private static class StreamHolder {
-        /** Output stream. */
-        private final OptimizedObjectOutputStream out = createOut();
-
-        /** Input stream. */
-        private final OptimizedObjectInputStream in = createIn();
-
-        /** Output streams counter. */
-        private int outAcquireCnt;
-
-        /** Input streams counter. */
-        private int inAcquireCnt;
-
-        /**
-         * Gets output stream.
-         *
-         * @return Object output stream.
-         */
-        OptimizedObjectOutputStream acquireOut() {
-            return outAcquireCnt++ > 0 ? createOut() : out;
-        }
-
-        /**
-         * Gets input stream.
-         *
-         * @return Object input stream.
-         */
-        OptimizedObjectInputStream acquireIn() {
-            return inAcquireCnt++ > 0 ? createIn() : in;
-        }
-
-        /**
-         * Releases output stream.
-         */
-        void releaseOut() {
-            outAcquireCnt--;
-        }
-
-        /**
-         * Releases input stream.
-         */
-        void releaseIn() {
-            inAcquireCnt--;
-        }
-
-        /**
-         * Creates output stream.
-         *
-         * @return Object output stream.
-         */
-        private OptimizedObjectOutputStream createOut() {
-            try {
-                return new OptimizedObjectOutputStream(new 
GridUnsafeDataOutput(4 * 1024));
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object output 
stream.", e);
-            }
-        }
-
-        /**
-         * Creates input stream.
-         *
-         * @return Object input stream.
-         */
-        private OptimizedObjectInputStream createIn() {
-            try {
-                return new OptimizedObjectInputStream(new 
GridUnsafeDataInput());
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object input 
stream.", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1278556..c46c532 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1677,7 +1677,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             Map<String, Object> attrs = new HashMap<>(node.getAttributes());
 
             attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                
marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+                
U.toArray(marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))));
 
             node.setAttributes(attrs);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java
index 09adbb6..eea9a32 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java
@@ -239,41 +239,6 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testPool() throws Exception {
-        final TestObject obj = new TestObject();
-
-        obj.longVal = 100L;
-        obj.doubleVal = 100.0d;
-        obj.longArr = new Long[100 * 1024];
-        obj.doubleArr = new Double[100 * 1024];
-
-        Arrays.fill(obj.longArr, 100L);
-        Arrays.fill(obj.doubleArr, 100.0d);
-
-        final OptimizedMarshaller marsh = new OptimizedMarshaller();
-
-        marsh.setContext(CTX);
-
-        marsh.setPoolSize(5);
-
-        try {
-            multithreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    for (int i = 0; i < 50; i++)
-                        assertEquals(obj, marsh.unmarshal(marsh.marshal(obj), 
null));
-
-                    return null;
-                }
-            }, 20);
-        }
-        finally {
-            marsh.setPoolSize(0);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testObjectWithNulls() throws Exception {
         TestObject obj = new TestObject();
 
@@ -883,7 +848,7 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testReadToArray() throws Exception {
-        OptimizedObjectInputStream in = OptimizedObjectStreamRegistry.in();
+        OptimizedObjectInputStream in = new OptimizedObjectInputStream(new 
GridUnsafeDataInput());
 
         try {
             byte[] arr = new byte[50];
@@ -922,7 +887,7 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
                 assertEquals(i < 10 ? 40 + i : 0, buf[i]);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(in);
+            U.closeQuiet(in);
         }
     }
 
@@ -1020,7 +985,7 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
         OptimizedObjectInputStream in = null;
 
         try {
-            out = OptimizedObjectStreamRegistry.out();
+            out = new OptimizedObjectOutputStream(new 
GridUnsafeDataOutput(512));
 
             out.context(CTX, null, true);
 
@@ -1028,7 +993,7 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
 
             byte[] arr = out.out().array();
 
-            in = OptimizedObjectStreamRegistry.in();
+            in = new OptimizedObjectInputStream(new GridUnsafeDataInput());
 
             in.context(CTX, null, getClass().getClassLoader());
 
@@ -1041,8 +1006,8 @@ public class OptimizedObjectStreamSelfTest extends 
GridCommonAbstractTest {
             return (T)obj0;
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(out);
-            OptimizedObjectStreamRegistry.closeIn(in);
+            U.closeQuiet(out);
+            U.closeQuiet(in);
         }
     }
 

Reply via email to