ignite-950: refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55c7c9fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55c7c9fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55c7c9fa Branch: refs/heads/ignite-950 Commit: 55c7c9fa9c89bc20ffdf732a2594909e3c155b87 Parents: 28f8eda Author: Denis Magda <dma...@gridgain.com> Authored: Mon Jun 29 18:19:59 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Mon Jun 29 18:19:59 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 2 +- .../cache/CacheIndexedObjectImpl.java | 4 +- .../IgniteCacheObjectProcessorImpl.java | 38 +- .../processors/query/GridQueryProcessor.java | 4 +- .../optimized/OptimizedClassDescriptor.java | 17 +- .../optimized/OptimizedMarshaller.java | 166 +++++++- .../optimized/OptimizedMarshallerExt.java | 405 ------------------- .../OptimizedMarshallerIndexingHandler.java | 223 ++++++++++ .../OptimizedMarshallerProtocolVersion.java | 32 ++ .../optimized/OptimizedMarshallerUtils.java | 35 +- .../optimized/OptimizedObjectInputStream.java | 79 ++-- .../OptimizedObjectInputStreamExt.java | 52 --- .../optimized/OptimizedObjectOutputStream.java | 128 +++--- .../OptimizedObjectOutputStreamExt.java | 112 ----- .../OptimizedObjectStreamExtRegistry.java | 225 ----------- .../OptimizedMarshallerExtSelfTest.java | 71 ++-- .../OptimizedObjectStreamSelfTest.java | 4 +- .../junits/IgniteTestResources.java | 3 +- ...acheOptimizedMarshallerExtQuerySelfTest.java | 15 +- 19 files changed, 617 insertions(+), 998 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 7356d85..5cbe377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1725,7 +1725,7 @@ public class IgnitionEx { marsh = new JdkMarshaller(); } else - marsh = new OptimizedMarshallerExt(); + marsh = new OptimizedMarshaller(); } myCfg.setMarshaller(marsh); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheIndexedObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheIndexedObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheIndexedObjectImpl.java index 7963245..66631ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheIndexedObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheIndexedObjectImpl.java @@ -169,7 +169,7 @@ public class CacheIndexedObjectImpl extends CacheObjectAdapter { * @return {@code true} if has. * @throws IgniteCheckedException In case of error. */ - public boolean hasField(String fieldName, OptimizedMarshallerExt marsh, Field field) throws IgniteCheckedException { + public boolean hasField(String fieldName, OptimizedMarshaller marsh, Field field) throws IgniteCheckedException { if (field != null && val != null) { try { field.get(val); @@ -195,7 +195,7 @@ public class CacheIndexedObjectImpl extends CacheObjectAdapter { * @throws IgniteFieldNotFoundException In case if there is no such a field. * @throws IgniteCheckedException In case of error. */ - public Object field(String fieldName, OptimizedMarshallerExt marsh, Field field) throws IgniteCheckedException { + public Object field(String fieldName, OptimizedMarshaller marsh, Field field) throws IgniteCheckedException { if (field != null && val != null) { try { return field.get(val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 3bca8ee..bba6966 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -61,7 +61,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme private final CountDownLatch startLatch = new CountDownLatch(1); /** */ - private OptimizedMarshallerExt optMarshExt; + private OptimizedMarshallerIndexingHandler indexingMgr; + + /** */ + private OptimizedMarshaller optMarsh; /** * @@ -87,8 +90,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme Marshaller marsh = ctx.config().getMarshaller(); - if (marsh instanceof OptimizedMarshallerExt) { - optMarshExt = (OptimizedMarshallerExt)marsh; + if (marsh instanceof OptimizedMarshaller) { + optMarsh = (OptimizedMarshaller)marsh; + + indexingMgr = new OptimizedMarshallerIndexingHandler(); OptimizedMarshallerMetaHandler metaHandler = new OptimizedMarshallerMetaHandler() { @Override public void addMeta(int typeId, OptimizedObjectMetadata meta) { @@ -121,7 +126,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } }; - optMarshExt.setMetadataHandler(metaHandler); + indexingMgr.setMetaHandler(metaHandler); + optMarsh.setIndexingHandler(indexingMgr); } } @@ -164,9 +170,9 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, int off, int len, - ClassLoader clsLdr) throws IgniteCheckedException { - if (optMarshExt != null) - return optMarshExt.unmarshal(bytes, off, len, clsLdr); + ClassLoader clsLdr) throws IgniteCheckedException { + if (optMarsh != null) + return optMarsh.unmarshal(bytes, off, len, clsLdr); else if (off > 0 || len != bytes.length) { byte[] arr = new byte[len]; @@ -307,7 +313,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public int typeId(String typeName) { - return optMarshExt != null ? OptimizedMarshallerUtils.resolveTypeId(typeName, optMarshExt.idMapper()) : 0; + return indexingMgr != null ? OptimizedMarshallerUtils.resolveTypeId(typeName, indexingMgr.idMapper()) : 0; } /** {@inheritDoc} */ @@ -335,10 +341,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public Object field(Object obj, String fieldName, Field field) throws IgniteFieldNotFoundException { - assert optMarshExt != null; + assert indexingMgr != null; try { - return ((CacheIndexedObjectImpl)obj).field(fieldName, optMarshExt, field); + return ((CacheIndexedObjectImpl)obj).field(fieldName, optMarsh, field); } catch (IgniteFieldNotFoundException e) { throw e; @@ -355,10 +361,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public boolean hasField(Object obj, String fieldName, Field field) { if (obj instanceof CacheIndexedObjectImpl) { - assert optMarshExt != null; + assert indexingMgr != null; try { - return ((CacheIndexedObjectImpl)obj).hasField(fieldName, optMarshExt, null); + return ((CacheIndexedObjectImpl)obj).hasField(fieldName, optMarsh, null); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -370,17 +376,19 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public boolean isFieldsIndexingEnabled() { - return optMarshExt != null; + return indexingMgr != null && indexingMgr.isFieldsIndexingSupported(); } /** {@inheritDoc} */ @Override public boolean isFieldsIndexingEnabled(Class<?> cls) { - return optMarshExt != null && optMarshExt.fieldsIndexingEnabled(cls); + return indexingMgr != null && indexingMgr.isFieldsIndexingSupported() && + indexingMgr.fieldsIndexingEnabledForClass(cls); } /** {@inheritDoc} */ @Override public boolean enableFieldsIndexing(Class<?> cls) throws IgniteCheckedException { - return optMarshExt != null && optMarshExt.enableFieldsIndexing(cls); + return indexingMgr != null && indexingMgr.isFieldsIndexingSupported() && + indexingMgr.enableFieldsIndexingForClass(cls); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 14e69bf..e00acbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1765,7 +1765,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { this.parent = parent; this.type = type; - if (keyCls != null) { + /*if (keyCls != null) { try { keyField = keyCls.getDeclaredField(propName); @@ -1786,7 +1786,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if ((keyCls != null || valCls != null) && keyField == null && valueField == null) U.warn(log, "Neither key nor value class has field " + - "[fieldName=" + propName + ", key=" + keyCls + ", val=" + valCls + "]"); + "[fieldName=" + propName + ", key=" + keyCls + ", val=" + valCls + "]");*/ } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java index 3596548..455e4db 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java @@ -50,6 +50,9 @@ public class OptimizedClassDescriptor { /** ID mapper. */ private final OptimizedMarshallerIdMapper mapper; + /** Indexing manager. */ + private final OptimizedMarshallerIndexingHandler idxHandler; + /** Class name. */ private final String name; @@ -115,6 +118,7 @@ public class OptimizedClassDescriptor { * @param cls Class. * @param ctx Context. * @param mapper ID mapper. + * @param idxHandler Fields indexing manager. * @throws IOException In case of error. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -122,13 +126,15 @@ public class OptimizedClassDescriptor { int typeId, ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper) + OptimizedMarshallerIdMapper mapper, + OptimizedMarshallerIndexingHandler idxHandler) throws IOException { this.cls = cls; this.typeId = typeId; this.clsMap = clsMap; this.ctx = ctx; this.mapper = mapper; + this.idxHandler = idxHandler; name = cls.getName(); @@ -648,7 +654,8 @@ public class OptimizedClassDescriptor { OptimizedClassDescriptor compDesc = classDescriptor(clsMap, obj.getClass().getComponentType(), ctx, - mapper); + mapper, + idxHandler); compDesc.writeTypeData(out); @@ -707,7 +714,7 @@ public class OptimizedClassDescriptor { break; case CLS: - OptimizedClassDescriptor clsDesc = classDescriptor(clsMap, (Class<?>)obj, ctx, mapper); + OptimizedClassDescriptor clsDesc = classDescriptor(clsMap, (Class<?>)obj, ctx, mapper, idxHandler); clsDesc.writeTypeData(out); @@ -734,12 +741,12 @@ public class OptimizedClassDescriptor { out.writeShort(checksum); out.writeMarshalAware(obj); - if (out.metaHandler.metadata(typeId) == null) { + if (idxHandler.metaHandler().metadata(typeId) == null) { OptimizedMarshalAwareMetaCollector collector = new OptimizedMarshalAwareMetaCollector(); ((OptimizedMarshalAware)obj).writeFields(collector); - out.metaHandler.addMeta(typeId, collector.meta()); + idxHandler.metaHandler().addMeta(typeId, collector.meta()); } break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java index a5b0cc4..c8e2dcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java @@ -18,6 +18,7 @@ package org.apache.ignite.marshaller.optimized; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; @@ -77,16 +78,22 @@ import java.util.concurrent.*; */ public class OptimizedMarshaller extends AbstractMarshaller { /** Default class loader. */ - protected final ClassLoader dfltClsLdr = getClass().getClassLoader(); + private final ClassLoader dfltClsLdr = getClass().getClassLoader(); /** Whether or not to require an object to be serializable in order to be marshalled. */ - protected boolean requireSer = true; + private boolean requireSer = true; /** ID mapper. */ - protected OptimizedMarshallerIdMapper mapper; + private OptimizedMarshallerIdMapper mapper; + + /** */ + private OptimizedMarshallerProtocolVersion protocolVersion = OptimizedMarshallerProtocolVersion.VER_1_1; + + /** */ + private OptimizedMarshallerIndexingHandler idxHandler; /** Class descriptors by class. */ - protected final ConcurrentMap<Class, OptimizedClassDescriptor> clsMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Class, OptimizedClassDescriptor> clsMap = new ConcurrentHashMap8<>(); /** * Creates new marshaller will all defaults. @@ -128,6 +135,24 @@ public class OptimizedMarshaller extends AbstractMarshaller { } /** + * Sets protocol version. + * + * @param protocolVersion Protocol version. + */ + public void setProtocolVersion(OptimizedMarshallerProtocolVersion protocolVersion) { + this.protocolVersion = protocolVersion; + } + + /** + * Gets marshaller's protocol version. + * + * @return Protocol version. + */ + public OptimizedMarshallerProtocolVersion getProtocolVersion() { + return protocolVersion; + } + + /** * Specifies size of cached object streams used by marshaller. Object streams are cached for * performance reason to avoid costly recreation for every serialization routine. If {@code 0} (default), * pool is not used and each thread has its own cached object stream which it keeps reusing. @@ -146,6 +171,18 @@ public class OptimizedMarshaller extends AbstractMarshaller { OptimizedObjectStreamRegistry.poolSize(poolSize); } + /** + * Sets fields indexing handler. + */ + public void setIndexingHandler(OptimizedMarshallerIndexingHandler idxHandler) { + this.idxHandler = idxHandler; + + idxHandler.setClassMap(clsMap); + idxHandler.setProtocolVersion(protocolVersion); + idxHandler.setIdMapper(mapper); + idxHandler.setMarshallerCtx(ctx); + } + /** {@inheritDoc} */ @Override public void marshal(@Nullable Object obj, OutputStream out) throws IgniteCheckedException { assert out != null; @@ -155,7 +192,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { try { objOut = OptimizedObjectStreamRegistry.out(); - objOut.context(clsMap, ctx, mapper, requireSer); + objOut.context(clsMap, ctx, mapper, requireSer, idxHandler); objOut.out().outputStream(out); @@ -176,7 +213,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { try { objOut = OptimizedObjectStreamRegistry.out(); - objOut.context(clsMap, ctx, mapper, requireSer); + objOut.context(clsMap, ctx, mapper, requireSer, idxHandler); objOut.writeObject(obj); @@ -200,7 +237,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { try { objIn = OptimizedObjectStreamRegistry.in(); - objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr); + objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, idxHandler); objIn.in().inputStream(in); @@ -229,7 +266,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { try { objIn = OptimizedObjectStreamRegistry.in(); - objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr); + objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, idxHandler); objIn.in().bytes(arr, arr.length); @@ -249,6 +286,119 @@ public class OptimizedMarshaller extends AbstractMarshaller { } /** + * Unmarshals object from byte array using given class loader and offset with len. + * + * @param <T> Type of unmarshalled object. + * @param arr Byte array. + * @param off Object's offset in the array. + * @param len Object's length in the array. + * @param clsLdr Class loader to use. + * @return Unmarshalled object. + * @throws IgniteCheckedException If unmarshalling failed. + */ + public <T> T unmarshal(byte[] arr, int off, int len, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { + assert arr != null; + + OptimizedObjectInputStream objIn = null; + + try { + objIn = OptimizedObjectStreamRegistry.in(); + + objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, idxHandler); + + objIn.in().bytes(arr, off, len); + + return (T)objIn.readObject(); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " + + "(make sure same version of all classes are available on all nodes or" + + " enable peer-class-loading): " + clsLdr, e); + } + finally { + OptimizedObjectStreamRegistry.closeIn(objIn); + } + } + + /** + * Checks whether object, serialized to byte array {@code arr}, has a field with name {@code fieldName}. + * + * @param fieldName Field name. + * @param arr Object's serialized form. + * @param off Object's start off. + * @param len Object's len. + * @return {@code true} if field exists. + */ + public boolean hasField(String fieldName, byte[] arr, int off, int len) throws IgniteCheckedException { + assert arr != null && fieldName != null; + + OptimizedObjectInputStream objIn = null; + + try { + objIn = OptimizedObjectStreamRegistry.in(); + + objIn.context(clsMap, ctx, mapper, dfltClsLdr, idxHandler); + + objIn.in().bytes(arr, off, len); + + return objIn.hasField(fieldName); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to find field with name: " + fieldName, e); + } + finally { + OptimizedObjectStreamRegistry.closeIn(objIn); + } + } + + /** + * Looks up field with the given name and returns it in one of the following representations. If the field is + * serializable and has a footer then it's not deserialized but rather returned wrapped by {@link CacheObjectImpl} + * for future processing. In all other cases the field is fully deserialized. + * + * @param fieldName Field name. + * @param arr Object's serialized form. + * @param off Object's start offset. + * @param len Object's len. + * @param clsLdr Class loader. + * @param <T> Expected field class. + * @return Field. + * @throws IgniteFieldNotFoundException In case if there is no such a field. + * @throws IgniteCheckedException In case of error. + */ + public <T> T readField(String fieldName, byte[] arr, int off, int len, @Nullable ClassLoader clsLdr) + throws IgniteCheckedException { + + assert arr != null && fieldName != null; + + OptimizedObjectInputStream objIn = null; + + try { + objIn = OptimizedObjectStreamRegistry.in(); + + objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, idxHandler); + + objIn.in().bytes(arr, off, len); + + return objIn.readField(fieldName); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to find field with name: " + fieldName, e); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " + + "(make sure same version of all classes are available on all nodes or" + + " enable peer-class-loading): " + clsLdr, e); + } + finally { + OptimizedObjectStreamRegistry.closeIn(objIn); + } + } + + /** * Checks whether {@code GridOptimizedMarshaller} is able to work on the current JVM. * <p> * As long as {@code GridOptimizedMarshaller} uses JVM-private API, which is not guaranteed http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerExt.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerExt.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerExt.java deleted file mode 100644 index 91e5a9a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerExt.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.marshaller.optimized; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.services.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.concurrent.*; - -import static org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.*; - -/** - * TODO - */ -public class OptimizedMarshallerExt extends OptimizedMarshaller { - /** */ - static final byte EMPTY_FOOTER = -1; - - /** */ - static final byte FOOTER_LEN_OFF = 2; - - /** */ - static final byte FOOTER_HANDLES_FLAG_OFF = 3; - - /** */ - static final int FOOTER_BODY_OFF_MASK = 0x3FFFFFFF; - - /** */ - static final int FOOTER_BODY_IS_HANDLE_MASK = 0x40000000; - - /** */ - static final byte FOOTER_BODY_HANDLE_MASK_BIT = 30; - - /** */ - private final static ConcurrentHashMap<Class<?>, Boolean> indexingEnabledCache = new ConcurrentHashMap<>(); - - /** */ - private volatile OptimizedMarshallerMetaHandler metaHandler; - - /** - * Creates new marshaller will all defaults. - * - * @throws IgniteException If this marshaller is not supported on the current JVM. - */ - public OptimizedMarshallerExt() { - // No-op - } - - /** - * Creates new marshaller providing whether it should - * require {@link Serializable} interface or not. - * - * @param requireSer Whether to require {@link Serializable}. - */ - public OptimizedMarshallerExt(boolean requireSer) { - super(requireSer); - } - - /** - * Sets metadata handler. - * - * @param metaHandler Metadata handler. - */ - public void setMetadataHandler(OptimizedMarshallerMetaHandler metaHandler) { - this.metaHandler = metaHandler; - } - - /** - * Returns currently set ID mapper. - * - * @return ID mapper. - */ - public OptimizedMarshallerIdMapper idMapper() { - return mapper; - } - - /** - * Checks whether fields indexing is excluded for class. - * - * @param cls Class. - * @return {@code true} if excluded. - */ - static boolean isFieldsIndexingExcludedForClass(MarshallerContext ctx, Class<?> cls) { - return ctx.isSystemType(cls.getName()) || Service.class.isAssignableFrom(cls) || - ComputeTask.class.isAssignableFrom(cls); - } - - /** - * Checks whether fields indexing is enabled for objects of the given {@code cls}. - * - * @param cls Class. - * @param metaHandler Metadata handler. - * @param ctx Marshaller context. - * @param clsMap Class map. - * @param mapper ID Mapper. - * @return {@code true} if fields indexing is enabled. - */ - static boolean fieldsIndexingSupported(Class<?> cls, OptimizedMarshallerMetaHandler metaHandler, - MarshallerContext ctx, ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, - OptimizedMarshallerIdMapper mapper) { - Boolean res = indexingEnabledCache.get(cls); - - if (res != null) - return res; - - if (isFieldsIndexingExcludedForClass(ctx, cls)) - res = false; - else if (OptimizedMarshalAware.class.isAssignableFrom(cls)) - res = true; - else { - try { - OptimizedClassDescriptor desc = OptimizedMarshallerUtils.classDescriptor(clsMap, cls, ctx, mapper); - - res = desc.fields() != null && desc.fields().fieldsIndexingSupported() && metaHandler != null && - metaHandler.metadata(desc.typeId()) != null; - } catch (IOException e) { - throw new IgniteException("Failed to load class description: " + cls); - } - } - - synchronized (indexingEnabledCache) { - indexingEnabledCache.putIfAbsent(cls, res); - } - - return res; - } - - /** - * Enables fields indexing for the object of the given {@code cls}. - * - * If enabled then a footer will be added during marshalling of an object of the given {@code cls} to the end of - * its serialized form. - * - * @param cls Class. - * @return {@code true} if fields indexing is enabled. - * @throws IgniteCheckedException In case of error. - */ - public boolean enableFieldsIndexing(Class<?> cls) throws IgniteCheckedException { - assert metaHandler != null; - - boolean res; - - if (isFieldsIndexingExcludedForClass(ctx, cls)) - res = false; - else if (OptimizedMarshalAware.class.isAssignableFrom(cls)) - res = true; - else { - try { - OptimizedClassDescriptor desc = OptimizedMarshallerUtils.classDescriptor(clsMap, cls, ctx, mapper); - - if (desc.fields() != null && desc.fields().fieldsIndexingSupported()) { - OptimizedObjectMetadata meta = new OptimizedObjectMetadata(); - - for (ClassFields clsFields : desc.fields().fieldsList()) - for (FieldInfo info : clsFields.fieldInfoList()) - meta.addField(info.name(), info.type()); - - metaHandler.addMeta(desc.typeId(), meta); - - res = true; - } - else - res = false; - - } catch (IOException e) { - throw new IgniteCheckedException("Failed to put meta for class: " + cls.getName(), e); - } - } - - synchronized (indexingEnabledCache) { - indexingEnabledCache.put(cls, res); - } - - return res; - } - - /** - * Checks whether fields indexing is enabled for objects of the given {@code cls}. - * - * @param cls Class. - * @return {@code true} if fields indexing is enabled. - */ - public boolean fieldsIndexingEnabled(Class<?> cls) { - assert metaHandler != null; - - return fieldsIndexingSupported(cls, metaHandler, ctx, clsMap, mapper); - } - - /** {@inheritDoc} */ - @Override public void setPoolSize(int poolSize) { - OptimizedObjectStreamExtRegistry.poolSize(poolSize); - } - - /** {@inheritDoc} */ - @Override public void marshal(@Nullable Object obj, OutputStream out) throws IgniteCheckedException { - assert out != null; - - OptimizedObjectOutputStreamExt objOut = null; - - try { - objOut = OptimizedObjectStreamExtRegistry.out(); - - objOut.context(clsMap, ctx, mapper, requireSer, metaHandler); - - objOut.out().outputStream(out); - - objOut.writeObject(obj); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to serialize object: " + obj, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeOut(objOut); - } - } - - /** {@inheritDoc} */ - @Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException { - OptimizedObjectOutputStreamExt objOut = null; - - try { - objOut = OptimizedObjectStreamExtRegistry.out(); - - objOut.context(clsMap, ctx, mapper, requireSer, metaHandler); - - objOut.writeObject(obj); - - return objOut.out().array(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to serialize object: " + obj, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeOut(objOut); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { - assert in != null; - - OptimizedObjectInputStreamExt objIn = null; - - try { - objIn = OptimizedObjectStreamExtRegistry.in(); - - objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, metaHandler); - - objIn.in().inputStream(in); - - return (T)objIn.readObject(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e); - } - catch (ClassNotFoundException e) { - throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " + - "(make sure same versions of all classes are available on all nodes or " + - "enable peer-class-loading): " + clsLdr, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeIn(objIn); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <T> T unmarshal(byte[] arr, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { - return unmarshal(arr, 0, arr.length, clsLdr); - } - - /** - * Unmarshals object from byte array using given class loader and offset with len. - * - * @param <T> Type of unmarshalled object. - * @param arr Byte array. - * @param off Object's offset in the array. - * @param len Object's length in the array. - * @param clsLdr Class loader to use. - * @return Unmarshalled object. - * @throws IgniteCheckedException If unmarshalling failed. - */ - public <T> T unmarshal(byte[] arr, int off, int len, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { - assert arr != null; - - OptimizedObjectInputStreamExt objIn = null; - - try { - objIn = OptimizedObjectStreamExtRegistry.in(); - - objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, metaHandler); - - objIn.in().bytes(arr, off, len); - - return (T)objIn.readObject(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e); - } - catch (ClassNotFoundException e) { - throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " + - "(make sure same version of all classes are available on all nodes or" + - " enable peer-class-loading): " + clsLdr, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeIn(objIn); - } - } - - /** - * Checks whether object, serialized to byte array {@code arr}, has a field with name {@code fieldName}. - * - * @param fieldName Field name. - * @param arr Object's serialized form. - * @param off Object's start off. - * @param len Object's len. - * @return {@code true} if field exists. - */ - public boolean hasField(String fieldName, byte[] arr, int off, int len) throws IgniteCheckedException { - assert arr != null && fieldName != null; - - OptimizedObjectInputStreamExt objIn = null; - - try { - objIn = OptimizedObjectStreamExtRegistry.in(); - - objIn.context(clsMap, ctx, mapper, dfltClsLdr, metaHandler); - - objIn.in().bytes(arr, off, len); - - return objIn.hasField(fieldName); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to find field with name: " + fieldName, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeIn(objIn); - } - } - - /** - * Looks up field with the given name and returns it in one of the following representations. If the field is - * serializable and has a footer then it's not deserialized but rather returned wrapped by {@link CacheObjectImpl} - * for future processing. In all other cases the field is fully deserialized. - * - * @param fieldName Field name. - * @param arr Object's serialized form. - * @param off Object's start offset. - * @param len Object's len. - * @param clsLdr Class loader. - * @param <T> Expected field class. - * @return Field. - * @throws IgniteFieldNotFoundException In case if there is no such a field. - * @throws IgniteCheckedException In case of error. - */ - public <T> T readField(String fieldName, byte[] arr, int off, int len, @Nullable ClassLoader clsLdr) - throws IgniteCheckedException { - - assert arr != null && fieldName != null; - - OptimizedObjectInputStreamExt objIn = null; - - try { - objIn = OptimizedObjectStreamExtRegistry.in(); - - objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr, metaHandler); - - objIn.in().bytes(arr, off, len); - - return objIn.readField(fieldName); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to find field with name: " + fieldName, e); - } - catch (ClassNotFoundException e) { - throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " + - "(make sure same version of all classes are available on all nodes or" + - " enable peer-class-loading): " + clsLdr, e); - } - finally { - OptimizedObjectStreamExtRegistry.closeIn(objIn); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerIndexingHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerIndexingHandler.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerIndexingHandler.java new file mode 100644 index 0000000..b8f8ad4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerIndexingHandler.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.marshaller.optimized; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.services.*; + +import java.io.*; +import java.util.concurrent.*; + +/** + * Fields indexing handler. + */ +public class OptimizedMarshallerIndexingHandler { + /** */ + private final static OptimizedMarshallerIndexingHandler instance = new OptimizedMarshallerIndexingHandler(); + + /** */ + private final static ConcurrentHashMap<Class<?>, Boolean> indexingEnabledCache = new ConcurrentHashMap<>(); + + /** Class descriptors by class. */ + private ConcurrentMap<Class, OptimizedClassDescriptor> clsMap; + + /** Metadata handler. */ + private volatile OptimizedMarshallerMetaHandler metaHandler; + + /** ID mapper. */ + private OptimizedMarshallerIdMapper mapper; + + /** Marshaller context. */ + private MarshallerContext ctx; + + /** Protocol version. */ + private OptimizedMarshallerProtocolVersion protocolVer; + + /** + * Sets metadata handler. + * + * @param metaHandler Metadata handler. + */ + public void setMetaHandler(OptimizedMarshallerMetaHandler metaHandler) { + this.metaHandler = metaHandler; + } + + /** + * Returns metadata handler. + * + * @return Metadata handler. + */ + public OptimizedMarshallerMetaHandler metaHandler() { + return metaHandler; + } + + /** + * Sets marshaller context class map. + * @param clsMap Class map. + */ + public void setClassMap(ConcurrentMap<Class, OptimizedClassDescriptor> clsMap) { + this.clsMap = clsMap; + } + + /** + * Sets marshaller ID mapper. + * + * @param mapper ID mapper. + */ + public void setIdMapper(OptimizedMarshallerIdMapper mapper) { + this.mapper = mapper; + } + + /** + * Returns ID mapper. + * + * @return ID mapper. + */ + public OptimizedMarshallerIdMapper idMapper() { + return mapper; + } + + /** + * Sets marshaller context. + * + * @param ctx Marshaller context. + */ + public void setMarshallerCtx(MarshallerContext ctx) { + this.ctx = ctx; + } + + /** + * Sets marshaller protocol version. + * + * @param protocolVer Protocol version. + */ + public void setProtocolVersion(OptimizedMarshallerProtocolVersion protocolVer) { + this.protocolVer = protocolVer; + } + + /** + * Checks whether this functionality is globally supported. + * + * @return {@code true} if enabled. + */ + public boolean isFieldsIndexingSupported() { + return protocolVer != OptimizedMarshallerProtocolVersion.VER_1; + } + + /** + * Enables fields indexing for the object of the given {@code cls}. + * + * If enabled then a footer will be added during marshalling of an object of the given {@code cls} to the end of + * its serialized form. + * + * @param cls Class. + * @return {@code true} if fields indexing is enabled. + * @throws IgniteCheckedException In case of error. + */ + public boolean enableFieldsIndexingForClass(Class<?> cls) throws IgniteCheckedException { + if (metaHandler == null) + return false; + + boolean res; + + if (isFieldsIndexingExcludedForClass(cls)) + res = false; + else if (OptimizedMarshalAware.class.isAssignableFrom(cls)) + res = true; + else { + try { + OptimizedClassDescriptor desc = OptimizedMarshallerUtils.classDescriptor(clsMap, cls, ctx, mapper, + this); + + if (desc.fields() != null && desc.fields().fieldsIndexingSupported()) { + OptimizedObjectMetadata meta = new OptimizedObjectMetadata(); + + for (OptimizedClassDescriptor.ClassFields clsFields : desc.fields().fieldsList()) + for (OptimizedClassDescriptor.FieldInfo info : clsFields.fieldInfoList()) + meta.addField(info.name(), info.type()); + + metaHandler.addMeta(desc.typeId(), meta); + + res = true; + } + else + res = false; + + } catch (IOException e) { + throw new IgniteCheckedException("Failed to put meta for class: " + cls.getName(), e); + } + } + + synchronized (indexingEnabledCache) { + indexingEnabledCache.put(cls, res); + } + + return res; + } + + /** + * Checks whether fields indexing is enabled for objects of the given {@code cls}. + * + * @param cls Class. + * @return {@code true} if fields indexing is enabled. + */ + public boolean fieldsIndexingEnabledForClass(Class<?> cls) { + if (metaHandler == null) + return false; + + Boolean res = indexingEnabledCache.get(cls); + + if (res != null) + return res; + + if (isFieldsIndexingExcludedForClass(cls)) + res = false; + else if (OptimizedMarshalAware.class.isAssignableFrom(cls)) + res = true; + else { + try { + OptimizedClassDescriptor desc = OptimizedMarshallerUtils.classDescriptor(clsMap, cls, ctx, mapper, + this); + + res = desc.fields() != null && desc.fields().fieldsIndexingSupported() && metaHandler != null && + metaHandler.metadata(desc.typeId()) != null; + } catch (IOException e) { + throw new IgniteException("Failed to load class description: " + cls); + } + } + + synchronized (indexingEnabledCache) { + indexingEnabledCache.putIfAbsent(cls, res); + } + + return res; + } + + /** + * Checks whether fields indexing is excluded for class. + * + * @param cls Class. + * @return {@code true} if excluded. + */ + private boolean isFieldsIndexingExcludedForClass(Class<?> cls) { + return ctx.isSystemType(cls.getName()) || Service.class.isAssignableFrom(cls) || + ComputeTask.class.isAssignableFrom(cls); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerProtocolVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerProtocolVersion.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerProtocolVersion.java new file mode 100644 index 0000000..0dbbbe0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerProtocolVersion.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.marshaller.optimized; + +/** + * Optimize marshaller protocol versions. + */ +public enum OptimizedMarshallerProtocolVersion { + /** Initial version. */ + VER_1, + + /** + * Footer addition during marshalling. Footer is used to retrieve object fields without full object + * deserialization. + */ + VER_1_1; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java index cd25f1c..420da5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java @@ -142,10 +142,28 @@ public class OptimizedMarshallerUtils { static final byte EXTERNALIZABLE = 101; /** */ - public static final byte SERIALIZABLE = 102; + static final byte SERIALIZABLE = 102; /** */ - public static final byte MARSHAL_AWARE = 103; + static final byte MARSHAL_AWARE = 103; + + /** */ + static final byte EMPTY_FOOTER = -1; + + /** */ + static final byte FOOTER_LEN_OFF = 2; + + /** */ + static final byte FOOTER_HANDLES_FLAG_OFF = 3; + + /** */ + static final int FOOTER_BODY_OFF_MASK = 0x3FFFFFFF; + + /** */ + static final int FOOTER_BODY_IS_HANDLE_MASK = 0x40000000; + + /** */ + static final byte FOOTER_BODY_HANDLE_MASK_BIT = 30; /** UTF-8 character name. */ static final Charset UTF_8 = Charset.forName("UTF-8"); @@ -175,6 +193,7 @@ public class OptimizedMarshallerUtils { * @param cls Class. * @param ctx Context. * @param mapper ID mapper. + * @param idxHandler Fields indexing handler. * @return Descriptor. * @throws IOException In case of error. */ @@ -182,7 +201,8 @@ public class OptimizedMarshallerUtils { ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, Class cls, MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper) + OptimizedMarshallerIdMapper mapper, + OptimizedMarshallerIndexingHandler idxHandler) throws IOException { OptimizedClassDescriptor desc = clsMap.get(cls); @@ -199,7 +219,7 @@ public class OptimizedMarshallerUtils { throw new IOException("Failed to register class: " + cls.getName(), e); } - desc = new OptimizedClassDescriptor(cls, registered ? typeId : 0, clsMap, ctx, mapper); + desc = new OptimizedClassDescriptor(cls, registered ? typeId : 0, clsMap, ctx, mapper, idxHandler); if (registered) { OptimizedClassDescriptor old = clsMap.putIfAbsent(cls, desc); @@ -270,6 +290,7 @@ public class OptimizedMarshallerUtils { * @param ldr Class loader. * @param ctx Context. * @param mapper ID mapper. + * @param idxHandler Fields indexing handler. * @return Descriptor. * @throws IOException In case of error. * @throws ClassNotFoundException If class was not found. @@ -279,7 +300,8 @@ public class OptimizedMarshallerUtils { int id, ClassLoader ldr, MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper) throws IOException, ClassNotFoundException { + OptimizedMarshallerIdMapper mapper, + OptimizedMarshallerIndexingHandler idxHandler) throws IOException, ClassNotFoundException { Class cls; try { @@ -293,7 +315,8 @@ public class OptimizedMarshallerUtils { if (desc == null) { OptimizedClassDescriptor old = clsMap.putIfAbsent(cls, desc = - new OptimizedClassDescriptor(cls, resolveTypeId(cls.getName(), mapper), clsMap, ctx, mapper)); + new OptimizedClassDescriptor(cls, resolveTypeId(cls.getName(), mapper), clsMap, ctx, mapper, + idxHandler)); if (old != null) desc = old; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java index 5a35586..69d5e88 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java @@ -32,7 +32,6 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.*; -import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerExt.*; /** * Optimized object input stream. @@ -45,22 +44,22 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt private static final Object DUMMY = new Object(); /** */ - protected MarshallerContext ctx; + private MarshallerContext ctx; /** */ - protected OptimizedMarshallerIdMapper mapper; + private OptimizedMarshallerIdMapper mapper; /** */ - protected ClassLoader clsLdr; + private ClassLoader clsLdr; /** */ - protected OptimizedMarshallerMetaHandler metaHandler; + private GridDataInput in; /** */ - protected GridDataInput in; + private ConcurrentMap<Class, OptimizedClassDescriptor> clsMap; /** */ - protected ConcurrentMap<Class, OptimizedClassDescriptor> clsMap; + private OptimizedMarshallerIndexingHandler idxHandler; /** */ private Object curObj; @@ -96,36 +95,21 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt * @param ctx Context. * @param mapper ID mapper. * @param clsLdr Class loader. + * @param idxHandler Fields indexing handler. */ protected void context( ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, MarshallerContext ctx, OptimizedMarshallerIdMapper mapper, - ClassLoader clsLdr) + ClassLoader clsLdr, + OptimizedMarshallerIndexingHandler idxHandler + ) { this.clsMap = clsMap; this.ctx = ctx; this.mapper = mapper; this.clsLdr = clsLdr; - } - - /** - * @param clsMap Class descriptors by class map. - * @param ctx Context. - * @param mapper ID mapper. - * @param clsLdr Class loader. - * @param metaHandler Metadata handler. - */ - protected void context( - ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, - MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper, - ClassLoader clsLdr, - OptimizedMarshallerMetaHandler metaHandler) - { - context(clsMap, ctx, mapper, clsLdr); - - this.metaHandler = metaHandler; + this.idxHandler = idxHandler; } /** @@ -149,7 +133,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt ctx = null; clsLdr = null; clsMap = null; - metaHandler = null; + idxHandler = null; } /** {@inheritDoc} */ @@ -283,8 +267,8 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt int typeId = readInt(); OptimizedClassDescriptor desc = typeId == 0 ? - classDescriptor(clsMap, U.forName(readUTF(), clsLdr), ctx, mapper): - classDescriptor(clsMap, typeId, clsLdr, ctx, mapper); + classDescriptor(clsMap, U.forName(readUTF(), clsLdr), ctx, mapper, idxHandler): + classDescriptor(clsMap, typeId, clsLdr, ctx, mapper, idxHandler); curCls = desc.describedClass(); @@ -314,7 +298,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt int compTypeId = readInt(); return compTypeId == 0 ? U.forName(readUTF(), clsLdr) : - classDescriptor(clsMap, compTypeId, clsLdr, ctx, mapper).describedClass(); + classDescriptor(clsMap, compTypeId, clsLdr, ctx, mapper, idxHandler).describedClass(); } /** @@ -644,7 +628,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt int handle = handles.assign(obj); - OptimizedObjectMetadata meta = metaHandler.metadata(typeId); + OptimizedObjectMetadata meta = idxHandler.metaHandler().metadata(typeId); assert meta != null; @@ -705,7 +689,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt int handle = handles.assign(obj); - boolean hasFooter = hasFooter(obj.getClass()); + boolean hasFooter = idxHandler.isFieldsIndexingSupported() && in.readBoolean(); for (int i = 0; i < mtds.size(); i++) { Method mtd = mtds.get(i); @@ -1271,35 +1255,26 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt } /** - * Checks whether objects of the {@code cls} have footer. - * - * @param cls Class. - * @return {@code true} if has. - * @throws IOException In case of error. - */ - protected boolean hasFooter(Class<?> cls) throws IOException { - return false; - } - - /** - * Skips object footer from the underlying stream. - * - * @throws IOException In case of error. + * Skips footer. */ protected void skipFooter() throws IOException { - // No-op + short footerLen = in.readShort(); + + if (footerLen != EMPTY_FOOTER) + in.skipBytes(footerLen - 2); } /** - * Reads field's type during its deserialization. + * Reads field type. * * @return Field type. * @throws IOException In case of error. */ protected int readFieldType() throws IOException { - return 0; + return in.readByte(); } + /** * Checks whether the object has a field with name {@code fieldName}. * @@ -1352,7 +1327,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt if (range != null && range.start >= 0) { byte fieldType = in.readByte(range.start); - if ((fieldType == SERIALIZABLE && metaHandler.metadata(in.readInt(range.start + 1)) != null) + if ((fieldType == SERIALIZABLE && idxHandler.metaHandler().metadata(in.readInt(range.start + 1)) != null) || fieldType == MARSHAL_AWARE) return (F)new CacheIndexedObjectImpl(in.array(), range.start, range.len); else { @@ -1394,7 +1369,7 @@ public class OptimizedObjectInputStream extends ObjectInputStream implements Opt in.position(oldPos); } - OptimizedObjectMetadata meta = metaHandler.metadata(typeId); + OptimizedObjectMetadata meta = idxHandler.metaHandler().metadata(typeId); if (meta == null) // TODO: IGNITE-950 add warning! http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStreamExt.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStreamExt.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStreamExt.java deleted file mode 100644 index 226ea20..0000000 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStreamExt.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.marshaller.optimized; - -import org.apache.ignite.internal.util.io.*; - -import java.io.*; - -import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerExt.*; - - -/** - * TODO: IGNITE-950 - */ -public class OptimizedObjectInputStreamExt extends OptimizedObjectInputStream { - /** {@inheritDoc} */ - public OptimizedObjectInputStreamExt(GridDataInput in) throws IOException { - super(in); - } - - /** {@inheritDoc} */ - @Override protected boolean hasFooter(Class<?> cls) throws IOException { - return fieldsIndexingSupported(cls, metaHandler, ctx, clsMap, mapper); - } - - /** {@inheritDoc} */ - @Override protected void skipFooter() throws IOException { - short footerLen = in.readShort(); - - if (footerLen != EMPTY_FOOTER) - in.skipBytes(footerLen - 2); - } - - /** {@inheritDoc} */ - @Override protected int readFieldType() throws IOException { - return in.readByte(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java index c697c2e..b4be275 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java @@ -28,7 +28,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.lang.reflect.*; import java.util.*; -import java.util.Date; import java.util.concurrent.*; import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.*; @@ -45,22 +44,22 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O ); /** */ - protected final GridDataOutput out; + private final GridDataOutput out; /** */ - protected MarshallerContext ctx; + private MarshallerContext ctx; /** */ - protected OptimizedMarshallerIdMapper mapper; + private OptimizedMarshallerIdMapper mapper; /** */ - protected ConcurrentMap<Class, OptimizedClassDescriptor> clsMap; + private ConcurrentMap<Class, OptimizedClassDescriptor> clsMap; /** */ - protected OptimizedMarshallerMetaHandler metaHandler; + private OptimizedMarshallerIndexingHandler idxHandler; /** */ - protected boolean requireSer; + private boolean requireSer; /** */ private final GridHandleTable handles = new GridHandleTable(10, 3.00f); @@ -93,32 +92,18 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O * @param ctx Context. * @param mapper ID mapper. * @param requireSer Require {@link Serializable} flag. + * @param idxHandler Fields indexing handler. */ protected void context(ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, MarshallerContext ctx, OptimizedMarshallerIdMapper mapper, - boolean requireSer) { + boolean requireSer, + OptimizedMarshallerIndexingHandler idxHandler) { this.clsMap = clsMap; this.ctx = ctx; this.mapper = mapper; this.requireSer = requireSer; - } - - /** - * @param clsMap Class descriptors by class map. - * @param ctx Context. - * @param mapper ID mapper. - * @param requireSer Require {@link Serializable} flag. - * @param metaHandler Metadata handler. - */ - protected void context(ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, - MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper, - boolean requireSer, - OptimizedMarshallerMetaHandler metaHandler) { - context(clsMap, ctx, mapper, requireSer); - - this.metaHandler = metaHandler; + this.idxHandler = idxHandler; } /** @@ -141,7 +126,7 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O ctx = null; clsMap = null; - metaHandler = null; + idxHandler = null; } /** {@inheritDoc} */ @@ -214,7 +199,8 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O clsMap, obj instanceof Object[] ? Object[].class : obj.getClass(), ctx, - mapper); + mapper, + idxHandler); if (desc.excluded()) { writeByte(NULL); @@ -241,7 +227,8 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O desc = classDescriptor(clsMap, obj instanceof Object[] ? Object[].class : obj.getClass(), ctx, - mapper); + mapper, + idxHandler); } if (handle >= 0) { @@ -340,13 +327,11 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O * @throws IOException In case of error. */ void writeMarshalAware(Object obj) throws IOException { - Footer footer = createFooter(obj.getClass()); + if (!idxHandler.isFieldsIndexingSupported()) + throw new IOException("Failed to marshal OptimizedMarshalAware object. Optimized marshaller protocol " + + "version must be no less then OptimizedMarshallerProtocolVersion.VER_1_1."); - if (footer == null) - throw new IOException("Failed to marshal OptimizedMarshalAware object. OptimizedMarshallerExt must be " + - "set to IgniteConfiguration [obj=" + obj.getClass().getName() + "]"); - - footer.indexingSupported(true); + Footer footer = new Footer(); if (marshalAwareFooters == null) marshalAwareFooters = new Stack<>(); @@ -376,10 +361,17 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O @SuppressWarnings("ForLoopReplaceableByForEach") void writeSerializable(Object obj, List<Method> mtds, OptimizedClassDescriptor.Fields fields) throws IOException { - Footer footer = createFooter(obj.getClass()); + Footer footer = null; - if (footer != null) - footer.indexingSupported(fields.fieldsIndexingSupported()); + if (idxHandler.isFieldsIndexingSupported()) { + boolean hasFooter = fields.fieldsIndexingSupported() && + idxHandler.fieldsIndexingEnabledForClass(obj.getClass()); + + out.writeBoolean(hasFooter); + + if (hasFooter) + footer = new Footer(); + } for (int i = 0; i < mtds.size(); i++) { Method mtd = mtds.get(i); @@ -944,7 +936,7 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O * @throws IOException If error. */ protected void writeFieldType(byte type) throws IOException { - // No-op + out.writeByte(type); } /** {@inheritDoc} */ @@ -1094,16 +1086,6 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O } /** - * Creates new instance of {@code Footer}. - * - * @param cls Class. - * @return {@code Footer} instance. - */ - protected Footer createFooter(Class<?> cls) { - return null; - } - - /** * Returns objects that were added to handles table. * Used ONLY for test purposes. * @@ -1211,34 +1193,62 @@ public class OptimizedObjectOutputStream extends ObjectOutputStream implements O /** * Footer that is written at the end of object's serialization. */ - protected interface Footer { - /** - * Whether indexing supported or not. - * - * @param indexingSupported {@code true} if supported. - */ - void indexingSupported(boolean indexingSupported); + private class Footer { + /** */ + private ArrayList<Long> data = new ArrayList<>(); + + /** */ + private boolean hasHandles; /** * Adds offset of a field that must be placed next to the footer. * * @param off Field offset. */ - void addNextFieldOff(int off); + public void addNextFieldOff(int off) { + data.add((long)(off & ~FOOTER_BODY_IS_HANDLE_MASK)); + } /** * Adds handle's offset of a field that must be placed next to the footer. * * @param handleOff Handle offset. - * @param handleLen Handle length. + * @param handleLength Handle length. */ - void addNextHandleField(int handleOff, int handleLen); + public void addNextHandleField(int handleOff, int handleLength) { + hasHandles = true; + + data.add(((long)handleLength << 32) | (handleOff | FOOTER_BODY_IS_HANDLE_MASK)); + } /** * Writes footer content to the OutputStream. * * @throws IOException In case of error. */ - void write() throws IOException; + public void write() throws IOException { + if (data == null) + writeShort(EMPTY_FOOTER); + else { + // +5 - 2 bytes for footer len at the beginning, 2 bytes for footer len at the end, 1 byte for handles + // indicator flag. + short footerLen = (short)(data.size() * (hasHandles ? 8 : 4) + 5); + + writeShort(footerLen); + + if (hasHandles) { + for (long body : data) + writeLong(body); + } + else { + for (long body : data) + writeInt((int)body); + } + + writeByte(hasHandles ? 1 : 0); + + writeShort(footerLen); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStreamExt.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStreamExt.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStreamExt.java deleted file mode 100644 index ad5b515..0000000 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStreamExt.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.marshaller.optimized; - -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerExt.*; -import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.*; - -/** - * TODO: IGNITE-950 - */ -public class OptimizedObjectOutputStreamExt extends OptimizedObjectOutputStream { - /** - * Constructor. - * - * @param out Output stream. - * @throws IOException In case of error. - */ - protected OptimizedObjectOutputStreamExt(GridDataOutput out) throws IOException { - super(out); - } - - /** {@inheritDoc} */ - @Override protected Footer createFooter(Class<?> cls) { - if (fieldsIndexingSupported(cls, metaHandler, ctx, clsMap, mapper)) - return new FooterImpl(); - - return null; - } - - /** {@inheritDoc} */ - @Override protected void writeFieldType(byte type) throws IOException { - out.writeByte(type); - } - - /** - * - */ - private class FooterImpl implements OptimizedObjectOutputStream.Footer { - /** */ - private ArrayList<Long> data; - - /** */ - private boolean hasHandles; - - /** {@inheritDoc} */ - @Override public void indexingSupported(boolean indexingSupported) { - if (indexingSupported) - data = new ArrayList<>(); - else - data = null; - } - - /** {@inheritDoc} */ - @Override public void addNextFieldOff(int off) { - data.add((long)(off & ~FOOTER_BODY_IS_HANDLE_MASK)); - } - - /** {@inheritDoc} */ - @Override public void addNextHandleField(int handleOff, int handleLength) { - hasHandles = true; - - data.add(((long)handleLength << 32) | (handleOff | FOOTER_BODY_IS_HANDLE_MASK)); - } - - /** {@inheritDoc} */ - @Override public void write() throws IOException { - if (data == null) - writeShort(EMPTY_FOOTER); - else { - // +5 - 2 bytes for footer len at the beginning, 2 bytes for footer len at the end, 1 byte for handles - // indicator flag. - short footerLen = (short)(data.size() * (hasHandles ? 8 : 4) + 5); - - writeShort(footerLen); - - if (hasHandles) { - for (long body : data) - writeLong(body); - } - else { - for (long body : data) - writeInt((int)body); - } - - writeByte(hasHandles ? 1 : 0); - - writeShort(footerLen); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c7c9fa/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamExtRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamExtRegistry.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamExtRegistry.java deleted file mode 100644 index f26bacb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamExtRegistry.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.marshaller.optimized; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.*; - -/** - * Storage for object streams. - */ -class OptimizedObjectStreamExtRegistry { - /** Holders. */ - private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>(); - - /** Holders pool. */ - private static BlockingQueue<StreamHolder> pool; - - /** - * Ensures singleton. - */ - private OptimizedObjectStreamExtRegistry() { - // No-op. - } - - /** - * Sets streams pool size. - * - * @param size Streams pool size. - */ - static void poolSize(int size) { - if (size > 0) { - pool = new LinkedBlockingQueue<>(size); - - for (int i = 0; i < size; i++) { - boolean b = pool.offer(new StreamHolder()); - - assert b; - } - } - else - pool = null; - } - - /** - * Gets output stream. - * - * @return Object output stream. - * @throws IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - static OptimizedObjectOutputStreamExt out() throws IgniteInterruptedCheckedException { - return holder().acquireOut(); - } - - /** - * Gets input stream. - * - * @return Object input stream. - * @throws IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - static OptimizedObjectInputStreamExt in() throws IgniteInterruptedCheckedException { - return holder().acquireIn(); - } - - /** - * Closes and releases output stream. - * - * @param out Object output stream. - */ - static void closeOut(OptimizedObjectOutputStream out) { - U.close(out, null); - - StreamHolder holder = holders.get(); - - holder.releaseOut(); - - if (pool != null) { - holders.remove(); - - boolean b = pool.offer(holder); - - assert b; - } - } - - /** - * Closes and releases input stream. - * - * @param in Object input stream. - */ - @SuppressWarnings("TypeMayBeWeakened") - static void closeIn(OptimizedObjectInputStream in) { - U.close(in, null); - - StreamHolder holder = holders.get(); - - holder.releaseIn(); - - if (pool != null) { - holders.remove(); - - boolean b = pool.offer(holder); - - assert b; - } - } - - /** - * Gets holder from pool or thread local. - * - * @return Stream holder. - * @throws IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool. - */ - private static StreamHolder holder() throws IgniteInterruptedCheckedException { - StreamHolder holder = holders.get(); - - if (holder == null) { - try { - holders.set(holder = pool != null ? pool.take() : new StreamHolder()); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException( - "Failed to take object stream from pool (thread interrupted).", e); - } - } - - return holder; - } - - /** - * Streams holder. - */ - private static class StreamHolder { - /** Output stream. */ - private final OptimizedObjectOutputStreamExt out = createOut(); - - /** Input stream. */ - private final OptimizedObjectInputStreamExt in = createIn(); - - /** Output streams counter. */ - private int outAcquireCnt; - - /** Input streams counter. */ - private int inAcquireCnt; - - /** - * Gets output stream. - * - * @return Object output stream. - */ - OptimizedObjectOutputStreamExt acquireOut() { - return outAcquireCnt++ > 0 ? createOut() : out; - } - - /** - * Gets input stream. - * - * @return Object input stream. - */ - OptimizedObjectInputStreamExt acquireIn() { - return inAcquireCnt++ > 0 ? createIn() : in; - } - - /** - * Releases output stream. - */ - void releaseOut() { - outAcquireCnt--; - } - - /** - * Releases input stream. - */ - void releaseIn() { - inAcquireCnt--; - } - - /** - * Creates output stream. - * - * @return Object output stream. - */ - private OptimizedObjectOutputStreamExt createOut() { - try { - return new OptimizedObjectOutputStreamExt(new GridUnsafeDataOutput(4 * 1024)); - } - catch (IOException e) { - throw new IgniteException("Failed to create object output stream.", e); - } - } - - /** - * Creates input stream. - * - * @return Object input stream. - */ - private OptimizedObjectInputStreamExt createIn() { - try { - return new OptimizedObjectInputStreamExt(new GridUnsafeDataInput()); - } - catch (IOException e) { - throw new IgniteException("Failed to create object input stream.", e); - } - } - } -}