GG-9900 - Portable marshaller refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15b4ebb1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15b4ebb1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15b4ebb1 Branch: refs/heads/ignite-471 Commit: 15b4ebb19875cba90bfcebe8a2f62fbe8c3f8213 Parents: d838a9d Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Mar 11 18:30:04 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Mar 11 18:30:04 2015 -0700 ---------------------------------------------------------------------- .../GridClientOptimizedMarshaller.java | 7 +- .../optimized/OptimizedMarshaller.java | 45 ++-- .../OptimizedObjectStreamRegistry.java | 224 ------------------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../OptimizedObjectStreamSelfTest.java | 47 +--- 5 files changed, 27 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java index 00f96b8..ff77177 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java @@ -50,17 +50,14 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller { * Constructs optimized marshaller with specific parameters. * * @param requireSer Require serializable flag. - * @param poolSize Object streams pool size. * @throws IOException If an I/O error occurs while writing stream header. * @throws IgniteException If this marshaller is not supported on the current JVM. * @see OptimizedMarshaller */ - public GridClientOptimizedMarshaller(boolean requireSer, int poolSize) throws IOException { - opMarsh = new OptimizedMarshaller(); + public GridClientOptimizedMarshaller(boolean requireSer) throws IOException { + opMarsh = new OptimizedMarshaller(requireSer); opMarsh.setContext(new ClientMarshallerContext()); - opMarsh.setRequireSerializable(requireSer); - opMarsh.setPoolSize(poolSize); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java index bb63fdd..8eaf23b 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java @@ -19,6 +19,8 @@ package org.apache.ignite.marshaller.optimized; import org.apache.ignite.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; import org.jetbrains.annotations.*; import sun.misc.*; @@ -74,6 +76,9 @@ import java.nio.*; * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> */ public class OptimizedMarshaller extends AbstractMarshaller { + /** Initial buffer size. */ + private static final int INIT_BUF_SIZE = 4 * 1024; + /** Default class loader. */ private final ClassLoader dfltClsLdr = getClass().getClassLoader(); @@ -101,6 +106,8 @@ public class OptimizedMarshaller extends AbstractMarshaller { * @param requireSer Whether to require {@link Serializable}. */ public OptimizedMarshaller(boolean requireSer) { + this(); + this.requireSer = requireSer; } @@ -122,25 +129,6 @@ public class OptimizedMarshaller extends AbstractMarshaller { this.mapper = mapper; } - /** - * Specifies size of cached object streams used by marshaller. Object streams are cached for - * performance reason to avoid costly recreation for every serialization routine. If {@code 0} (default), - * pool is not used and each thread has its own cached object stream which it keeps reusing. - * <p> - * Since each stream has an internal buffer, creating a stream for each thread can lead to - * high memory consumption if many large messages are marshalled or unmarshalled concurrently. - * Consider using pool in this case. This will limit number of streams that can be created and, - * therefore, decrease memory consumption. - * <p> - * NOTE: Using streams pool can decrease performance since streams will be shared between - * different threads which will lead to more frequent context switching. - * - * @param poolSize Streams pool size. If {@code 0}, pool is not used. - */ - public void setPoolSize(int poolSize) { - OptimizedObjectStreamRegistry.poolSize(poolSize); - } - /** {@inheritDoc} */ @Override public void marshal(@Nullable Object obj, OutputStream out) throws IgniteCheckedException { assert out != null; @@ -148,7 +136,8 @@ public class OptimizedMarshaller extends AbstractMarshaller { OptimizedObjectOutputStream objOut = null; try { - objOut = OptimizedObjectStreamRegistry.out(); + // TODO: IGNITE-471 - Need adaptive initial size. + objOut = new OptimizedObjectOutputStream(new GridUnsafeDataOutput(INIT_BUF_SIZE)); objOut.context(ctx, mapper, requireSer); @@ -160,7 +149,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { throw new IgniteCheckedException("Failed to serialize object: " + obj, e); } finally { - OptimizedObjectStreamRegistry.closeOut(objOut); + U.closeQuiet(objOut); } } @@ -169,7 +158,8 @@ public class OptimizedMarshaller extends AbstractMarshaller { OptimizedObjectOutputStream objOut = null; try { - objOut = OptimizedObjectStreamRegistry.out(); + // TODO: IGNITE-471 - Need adaptive initial size. + objOut = new OptimizedObjectOutputStream(new GridUnsafeDataOutput(INIT_BUF_SIZE)); objOut.context(ctx, mapper, requireSer); @@ -181,7 +171,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { throw new IgniteCheckedException("Failed to serialize object: " + obj, e); } finally { - OptimizedObjectStreamRegistry.closeOut(objOut); + U.closeQuiet(objOut); } } @@ -193,7 +183,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { OptimizedObjectInputStream objIn = null; try { - objIn = OptimizedObjectStreamRegistry.in(); + objIn = new OptimizedObjectInputStream(new GridUnsafeDataInput()); objIn.context(ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr); @@ -210,7 +200,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { clsLdr, e); } finally { - OptimizedObjectStreamRegistry.closeIn(objIn); + U.closeQuiet(objIn); } } @@ -223,10 +213,11 @@ public class OptimizedMarshaller extends AbstractMarshaller { OptimizedObjectInputStream objIn = null; try { - objIn = OptimizedObjectStreamRegistry.in(); + objIn = new OptimizedObjectInputStream(new GridUnsafeDataInput()); objIn.context(ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr); + // TODO: IGNITE-471 - Position is not moved. objIn.in().bytes(buf.array(), buf.position(), buf.remaining()); return (T)objIn.readObject(); @@ -240,7 +231,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { clsLdr, e); } finally { - OptimizedObjectStreamRegistry.closeIn(objIn); + U.closeQuiet(objIn); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java deleted file mode 100644 index ae28015..0000000 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.marshaller.optimized; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.*; - -/** - * Storage for object streams. - */ -class OptimizedObjectStreamRegistry { - /** Holders. */ - private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>(); - - /** Holders pool. */ - private static BlockingQueue<StreamHolder> pool; - - /** - * Ensures singleton. - */ - private OptimizedObjectStreamRegistry() { - // No-op. - } - - /** - * Sets streams pool size. - * - * @param size Streams pool size. - */ - static void poolSize(int size) { - if (size > 0) { - pool = new LinkedBlockingQueue<>(size); - - for (int i = 0; i < size; i++) { - boolean b = pool.offer(new StreamHolder()); - - assert b; - } - } - else - pool = null; - } - - /** - * Gets output stream. - * - * @return Object output stream. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException { - return holder().acquireOut(); - } - - /** - * Gets input stream. - * - * @return Object input stream. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException { - return holder().acquireIn(); - } - - /** - * Closes and releases output stream. - * - * @param out Object output stream. - */ - static void closeOut(OptimizedObjectOutputStream out) { - U.close(out, null); - - StreamHolder holder = holders.get(); - - holder.releaseOut(); - - if (pool != null) { - holders.remove(); - - boolean b = pool.offer(holder); - - assert b; - } - } - - /** - * Closes and releases input stream. - * - * @param in Object input stream. - */ - @SuppressWarnings("TypeMayBeWeakened") - static void closeIn(OptimizedObjectInputStream in) { - U.close(in, null); - - StreamHolder holder = holders.get(); - - holder.releaseIn(); - - if (pool != null) { - holders.remove(); - - boolean b = pool.offer(holder); - - assert b; - } - } - - /** - * Gets holder from pool or thread local. - * - * @return Stream holder. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - private static StreamHolder holder() throws IgniteInterruptedCheckedException { - StreamHolder holder = holders.get(); - - if (holder == null) { - try { - holders.set(holder = pool != null ? pool.take() : new StreamHolder()); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException("Failed to take object stream from pool (thread interrupted).", e); - } - } - - return holder; - } - - /** - * Streams holder. - */ - private static class StreamHolder { - /** Output stream. */ - private final OptimizedObjectOutputStream out = createOut(); - - /** Input stream. */ - private final OptimizedObjectInputStream in = createIn(); - - /** Output streams counter. */ - private int outAcquireCnt; - - /** Input streams counter. */ - private int inAcquireCnt; - - /** - * Gets output stream. - * - * @return Object output stream. - */ - OptimizedObjectOutputStream acquireOut() { - return outAcquireCnt++ > 0 ? createOut() : out; - } - - /** - * Gets input stream. - * - * @return Object input stream. - */ - OptimizedObjectInputStream acquireIn() { - return inAcquireCnt++ > 0 ? createIn() : in; - } - - /** - * Releases output stream. - */ - void releaseOut() { - outAcquireCnt--; - } - - /** - * Releases input stream. - */ - void releaseIn() { - inAcquireCnt--; - } - - /** - * Creates output stream. - * - * @return Object output stream. - */ - private OptimizedObjectOutputStream createOut() { - try { - return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024)); - } - catch (IOException e) { - throw new IgniteException("Failed to create object output stream.", e); - } - } - - /** - * Creates input stream. - * - * @return Object input stream. - */ - private OptimizedObjectInputStream createIn() { - try { - return new OptimizedObjectInputStream(new GridUnsafeDataInput()); - } - catch (IOException e) { - throw new IgniteException("Failed to create object input stream.", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 1278556..c46c532 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1677,7 +1677,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Map<String, Object> attrs = new HashMap<>(node.getAttributes()); attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, - marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); + U.toArray(marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)))); node.setAttributes(attrs); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15b4ebb1/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java index 09adbb6..eea9a32 100644 --- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamSelfTest.java @@ -239,41 +239,6 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testPool() throws Exception { - final TestObject obj = new TestObject(); - - obj.longVal = 100L; - obj.doubleVal = 100.0d; - obj.longArr = new Long[100 * 1024]; - obj.doubleArr = new Double[100 * 1024]; - - Arrays.fill(obj.longArr, 100L); - Arrays.fill(obj.doubleArr, 100.0d); - - final OptimizedMarshaller marsh = new OptimizedMarshaller(); - - marsh.setContext(CTX); - - marsh.setPoolSize(5); - - try { - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < 50; i++) - assertEquals(obj, marsh.unmarshal(marsh.marshal(obj), null)); - - return null; - } - }, 20); - } - finally { - marsh.setPoolSize(0); - } - } - - /** - * @throws Exception If failed. - */ public void testObjectWithNulls() throws Exception { TestObject obj = new TestObject(); @@ -883,7 +848,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReadToArray() throws Exception { - OptimizedObjectInputStream in = OptimizedObjectStreamRegistry.in(); + OptimizedObjectInputStream in = new OptimizedObjectInputStream(new GridUnsafeDataInput()); try { byte[] arr = new byte[50]; @@ -922,7 +887,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { assertEquals(i < 10 ? 40 + i : 0, buf[i]); } finally { - OptimizedObjectStreamRegistry.closeIn(in); + U.closeQuiet(in); } } @@ -1020,7 +985,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { OptimizedObjectInputStream in = null; try { - out = OptimizedObjectStreamRegistry.out(); + out = new OptimizedObjectOutputStream(new GridUnsafeDataOutput(512)); out.context(CTX, null, true); @@ -1028,7 +993,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { byte[] arr = out.out().array(); - in = OptimizedObjectStreamRegistry.in(); + in = new OptimizedObjectInputStream(new GridUnsafeDataInput()); in.context(CTX, null, getClass().getClassLoader()); @@ -1041,8 +1006,8 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest { return (T)obj0; } finally { - OptimizedObjectStreamRegistry.closeOut(out); - OptimizedObjectStreamRegistry.closeIn(in); + U.closeQuiet(out); + U.closeQuiet(in); } }