http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java deleted file mode 100644 index 43d5490..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -import static org.apache.ignite.internal.portable.GridPortableThreadLocalMemoryAllocator.*; - -/** - * Portable heap output stream. - */ -public final class GridPortableHeapOutputStream extends GridPortableAbstractOutputStream { - /** Default capacity. */ - private static final int DFLT_CAP = 1024; - - /** Allocator. */ - private final GridPortableMemoryAllocator alloc; - - /** Data. */ - private byte[] data; - - /** - * Constructor. - */ - public GridPortableHeapOutputStream() { - this(DFLT_CAP, DFLT_ALLOC); - } - - /** - * Constructor. - * - * @param cap Initial capacity. - */ - public GridPortableHeapOutputStream(int cap) { - this(cap, THREAD_LOCAL_ALLOC); - } - - /** - * Constructor. - * - * @param cap Initial capacity. - * @param alloc Allocator. - */ - public GridPortableHeapOutputStream(int cap, GridPortableMemoryAllocator alloc) { - data = alloc.allocate(cap); - - this.alloc = alloc; - } - - /** - * Constructor. - * - * @param data Data. - */ - public GridPortableHeapOutputStream(byte[] data) { - this(data, DFLT_ALLOC); - } - - /** - * Constructor. - * - * @param data Data. - * @param alloc Allocator. - */ - public GridPortableHeapOutputStream(byte[] data, GridPortableMemoryAllocator alloc) { - this.data = data; - this.alloc = alloc; - } - - /** {@inheritDoc} */ - @Override public void close() { - alloc.release(data, pos); - } - - /** {@inheritDoc} */ - @Override public void ensureCapacity(int cnt) { - if (cnt > data.length) { - int newCap = capacity(data.length, cnt); - - data = alloc.reallocate(data, newCap); - } - } - - /** {@inheritDoc} */ - @Override public byte[] array() { - return data; - } - - /** {@inheritDoc} */ - @Override public byte[] arrayCopy() { - byte[] res = new byte[pos]; - - UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean hasArray() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void writeByteAndShift(byte val) { - data[pos++] = val; - } - - /** {@inheritDoc} */ - @Override protected void copyAndShift(Object src, long off, int len) { - UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len); - - shift(len); - } - - /** {@inheritDoc} */ - @Override protected void writeShortFast(short val) { - UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeCharFast(char val) { - UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeIntFast(int val) { - UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeLongFast(long val) { - UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { - if (!LITTLE_ENDIAN) - val = Integer.reverseBytes(val); - - UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java deleted file mode 100644 index 4cfbd37..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -/** - * Portable memory allocator. - */ -public interface GridPortableMemoryAllocator { - /** Default memory allocator. */ - public static final GridPortableMemoryAllocator DFLT_ALLOC = new GridPortableSimpleMemoryAllocator(); - - /** - * Allocate memory. - * - * @param size Size. - * @return Data. - */ - public byte[] allocate(int size); - - /** - * Reallocates memory. - * - * @param data Current data chunk. - * @param size New size required. - * - * @return Data. - */ - public byte[] reallocate(byte[] data, int size); - - /** - * Release memory. - * - * @param data Data. - * @param maxMsgSize Max message size sent during the time the allocator is used. - */ - public void release(byte[] data, int maxMsgSize); - - /** - * Allocate memory. - * - * @param size Size. - * @return Address. - */ - public long allocateDirect(int size); - - /** - * Reallocate memory. - * - * @param addr Address. - * @param size Size. - * @return Address. - */ - public long reallocateDirect(long addr, int size); - - /** - * Release memory. - * - * @param addr Address. - */ - public void releaseDirect(long addr); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java deleted file mode 100644 index c65070c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -/** - * Portable off-heap input stream. - */ -public class GridPortableOffheapInputStream extends GridPortableAbstractInputStream { - /** Pointer. */ - private final long ptr; - - /** Capacity. */ - private final int cap; - - /** */ - private boolean forceHeap; - - /** - * Constructor. - * - * @param ptr Pointer. - * @param cap Capacity. - */ - public GridPortableOffheapInputStream(long ptr, int cap) { - this(ptr, cap, false); - } - - /** - * Constructor. - * - * @param ptr Pointer. - * @param cap Capacity. - * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will - * create heap-based objects. - */ - public GridPortableOffheapInputStream(long ptr, int cap, boolean forceHeap) { - this.ptr = ptr; - this.cap = cap; - this.forceHeap = forceHeap; - - len = cap; - } - - /** {@inheritDoc} */ - @Override public int remaining() { - return cap - pos; - } - - /** {@inheritDoc} */ - @Override public byte[] array() { - return arrayCopy(); - } - - /** {@inheritDoc} */ - @Override public byte[] arrayCopy() { - byte[] res = new byte[len]; - - UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean hasArray() { - return false; - } - - /** {@inheritDoc} */ - @Override protected byte readByteAndShift() { - return UNSAFE.getByte(ptr + pos++); - } - - /** {@inheritDoc} */ - @Override protected void copyAndShift(Object target, long off, int len) { - UNSAFE.copyMemory(null, ptr + pos, target, off, len); - - shift(len); - } - - /** {@inheritDoc} */ - @Override protected short readShortFast() { - return UNSAFE.getShort(ptr + pos); - } - - /** {@inheritDoc} */ - @Override protected char readCharFast() { - return UNSAFE.getChar(ptr + pos); - } - - /** {@inheritDoc} */ - @Override protected int readIntFast() { - return UNSAFE.getInt(ptr + pos); - } - - /** {@inheritDoc} */ - @Override protected long readLongFast() { - return UNSAFE.getLong(ptr + pos); - } - - /** {@inheritDoc} */ - @Override protected int readIntPositioned(int pos) { - int res = UNSAFE.getInt(ptr + pos); - - if (!LITTLE_ENDIAN) - res = Integer.reverseBytes(res); - - return res; - } - - /** {@inheritDoc} */ - @Override public long offheapPointer() { - return forceHeap ? 0 : ptr; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java deleted file mode 100644 index 41d49d4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -/** - * Portable offheap output stream. - */ -public class GridPortableOffheapOutputStream extends GridPortableAbstractOutputStream { - /** Pointer. */ - private long ptr; - - /** Length of bytes that cen be used before resize is necessary. */ - private int cap; - - /** - * Constructor. - * - * @param cap Capacity. - */ - public GridPortableOffheapOutputStream(int cap) { - this(0, cap); - } - - /** - * Constructor. - * - * @param ptr Pointer to existing address. - * @param cap Capacity. - */ - public GridPortableOffheapOutputStream(long ptr, int cap) { - this.ptr = ptr == 0 ? allocate(cap) : ptr; - - this.cap = cap; - } - - /** {@inheritDoc} */ - @Override public void close() { - release(ptr); - } - - /** {@inheritDoc} */ - @Override public void ensureCapacity(int cnt) { - if (cnt > cap) { - int newCap = capacity(cap, cnt); - - ptr = reallocate(ptr, newCap); - - cap = newCap; - } - } - - /** {@inheritDoc} */ - @Override public byte[] array() { - return arrayCopy(); - } - - /** {@inheritDoc} */ - @Override public byte[] arrayCopy() { - byte[] res = new byte[pos]; - - UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos); - - return res; - } - - /** - * @return Pointer. - */ - public long pointer() { - return ptr; - } - - /** - * @return Capacity. - */ - public int capacity() { - return cap; - } - - /** {@inheritDoc} */ - @Override protected void writeByteAndShift(byte val) { - UNSAFE.putByte(ptr + pos++, val); - } - - /** {@inheritDoc} */ - @Override protected void copyAndShift(Object src, long offset, int len) { - UNSAFE.copyMemory(src, offset, null, ptr + pos, len); - - shift(len); - } - - /** {@inheritDoc} */ - @Override protected void writeShortFast(short val) { - UNSAFE.putShort(ptr + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeCharFast(char val) { - UNSAFE.putChar(ptr + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeIntFast(int val) { - UNSAFE.putInt(ptr + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeLongFast(long val) { - UNSAFE.putLong(ptr + pos, val); - } - - /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { - if (!LITTLE_ENDIAN) - val = Integer.reverseBytes(val); - - UNSAFE.putInt(ptr + pos, val); - } - - /** {@inheritDoc} */ - @Override public boolean hasArray() { - return false; - } - - /** - * Allocate memory. - * - * @param cap Capacity. - * @return Pointer. - */ - protected long allocate(int cap) { - return UNSAFE.allocateMemory(cap); - } - - /** - * Reallocate memory. - * - * @param ptr Old pointer. - * @param cap Capacity. - * @return New pointer. - */ - protected long reallocate(long ptr, int cap) { - return UNSAFE.reallocateMemory(ptr, cap); - } - - /** - * Release memory. - * - * @param ptr Pointer. - */ - protected void release(long ptr) { - UNSAFE.freeMemory(ptr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java deleted file mode 100644 index 8c27bf6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -import org.apache.ignite.internal.util.*; - -import sun.misc.*; - -/** - * Naive implementation of portable memory allocator. - */ -public class GridPortableSimpleMemoryAllocator implements GridPortableMemoryAllocator { - /** Unsafe. */ - private static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** Array offset: byte. */ - protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** {@inheritDoc} */ - @Override public byte[] allocate(int size) { - return new byte[size]; - } - - /** {@inheritDoc} */ - @Override public byte[] reallocate(byte[] data, int size) { - byte[] newData = new byte[size]; - - UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); - - return newData; - } - - /** {@inheritDoc} */ - @Override public void release(byte[] data, int maxMsgSize) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long allocateDirect(int size) { - return UNSAFE.allocateMemory(size); - } - - /** {@inheritDoc} */ - @Override public long reallocateDirect(long addr, int size) { - return UNSAFE.reallocateMemory(addr, size); - } - - /** {@inheritDoc} */ - @Override public void releaseDirect(long addr) { - UNSAFE.freeMemory(addr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java new file mode 100644 index 0000000..9f0b1db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.portable.*; + +/** + * Portable abstract input stream. + */ +public abstract class PortableAbstractInputStream extends PortableAbstractStream + implements PortableInputStream { + /** Length of data inside array. */ + protected int len; + + /** {@inheritDoc} */ + @Override public byte readByte() { + ensureEnoughData(1); + + return readByteAndShift(); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray(int cnt) { + ensureEnoughData(cnt); + + byte[] res = new byte[cnt]; + + copyAndShift(res, BYTE_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + return readByte() == BYTE_ONE; + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray(int cnt) { + ensureEnoughData(cnt); + + boolean[] res = new boolean[cnt]; + + copyAndShift(res, BOOLEAN_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + ensureEnoughData(2); + + short res = readShortFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + short[] res = new short[cnt]; + + copyAndShift(res, SHORT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Short.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + ensureEnoughData(2); + + char res = readCharFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Character.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + char[] res = new char[cnt]; + + copyAndShift(res, CHAR_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Character.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + ensureEnoughData(4); + + int res = readIntFast(); + + shift(4); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + int[] res = new int[cnt]; + + copyAndShift(res, INT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Integer.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt(int pos) { + int delta = pos + 4 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readIntPositioned(pos); + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + return Float.intBitsToFloat(readInt()); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + float[] res = new float[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, FLOAT_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + int x = readIntFast(); + + shift(4); + + res[i] = Float.intBitsToFloat(Integer.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + ensureEnoughData(8); + + long res = readLongFast(); + + shift(8); + + if (!LITTLE_ENDIAN) + res = Long.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + long[] res = new long[cnt]; + + copyAndShift(res, LONG_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Long.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + double[] res = new double[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, DOUBLE_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + long x = readLongFast(); + + shift(8); + + res[i] = Double.longBitsToDouble(Long.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int read(byte[] arr, int off, int len) { + if (len > remaining()) + len = remaining(); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + if (remaining() + this.pos < pos) + throw new PortableException("Position is out of bounds: " + pos); + else + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Ensure that there is enough data. + * + * @param cnt Length. + */ + protected void ensureEnoughData(int cnt) { + if (remaining() < cnt) + throw new PortableException("Not enough data to read the value [position=" + pos + + ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); + } + + /** + * Read next byte from the stream and perform shift. + * + * @return Next byte. + */ + protected abstract byte readByteAndShift(); + + /** + * Copy data to target object shift position afterwards. + * + * @param target Target. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object target, long off, int len); + + /** + * Read short value (fast path). + * + * @return Short value. + */ + protected abstract short readShortFast(); + + /** + * Read char value (fast path). + * + * @return Char value. + */ + protected abstract char readCharFast(); + + /** + * Read int value (fast path). + * + * @return Int value. + */ + protected abstract int readIntFast(); + + /** + * Read long value (fast path). + * + * @return Long value. + */ + protected abstract long readLongFast(); + + /** + * Internal routine for positioned int value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract int readIntPositioned(int pos); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java new file mode 100644 index 0000000..2c3179a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Base portable output stream. + */ +public abstract class PortableAbstractOutputStream extends PortableAbstractStream + implements PortableOutputStream { + /** Minimal capacity when it is reasonable to start doubling resize. */ + private static final int MIN_CAP = 256; + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + ensureCapacity(pos + 1); + + writeByteAndShift(val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BYTE_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + writeByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BOOLEAN_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + writeShortFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, SHORT_ARR_OFF, cnt); + else { + for (short item : val) + writeShortFast(Short.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + writeCharFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, CHAR_ARR_OFF, cnt); + else { + for (char item : val) + writeCharFast(Character.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + ensureCapacity(pos + 4); + + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + writeIntFast(val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + ensureCapacity(pos + 4); + + writeIntPositioned(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, INT_ARR_OFF, cnt); + else { + for (int item : val) + writeIntFast(Integer.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + writeInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, FLOAT_ARR_OFF, cnt); + else { + for (float item : val) { + writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item))); + + shift(4); + } + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + ensureCapacity(pos + 8); + + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + writeLongFast(val); + + shift(8); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, LONG_ARR_OFF, cnt); + else { + for (long item : val) + writeLongFast(Long.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, DOUBLE_ARR_OFF, cnt); + else { + for (double item : val) { + writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item))); + + shift(8); + } + } + } + + /** {@inheritDoc} */ + @Override public void write(byte[] arr, int off, int len) { + ensureCapacity(pos + len); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + } + + /** {@inheritDoc} */ + @Override public void write(long addr, int cnt) { + ensureCapacity(pos + cnt); + + copyAndShift(null, addr, cnt); + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + ensureCapacity(pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Calculate new capacity. + * + * @param curCap Current capacity. + * @param reqCap Required capacity. + * @return New capacity. + */ + protected static int capacity(int curCap, int reqCap) { + int newCap; + + if (reqCap < MIN_CAP) + newCap = MIN_CAP; + else { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /** + * Write next byte to the stream. + * + * @param val Value. + */ + protected abstract void writeByteAndShift(byte val); + + /** + * Copy source object to the stream shift position afterwards. + * + * @param src Source. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object src, long off, int len); + + /** + * Write short value (fast path). + * + * @param val Short value. + */ + protected abstract void writeShortFast(short val); + + /** + * Write char value (fast path). + * + * @param val Char value. + */ + protected abstract void writeCharFast(char val); + + /** + * Write int value (fast path). + * + * @param val Int value. + */ + protected abstract void writeIntFast(int val); + + /** + * Write long value (fast path). + * + * @param val Long value. + */ + protected abstract void writeLongFast(long val); + + /** + * Write int value to the given position. + * + * @param pos Position. + * @param val Value. + */ + protected abstract void writeIntPositioned(int pos, int val); + + /** + * Ensure capacity. + * + * @param cnt Required byte count. + */ + protected abstract void ensureCapacity(int cnt); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java new file mode 100644 index 0000000..a39662b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.util.*; + +import sun.misc.*; + +import java.nio.*; + +/** + * Portable abstract stream. + */ +public abstract class PortableAbstractStream implements PortableStream { + /** Byte: zero. */ + protected static final byte BYTE_ZERO = 0; + + /** Byte: one. */ + protected static final byte BYTE_ONE = 1; + + /** Whether little endian is used on the platform. */ + protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: boolean. */ + protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Array offset: short. */ + protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** Array offset: char. */ + protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** Array offset: int. */ + protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** Array offset: float. */ + protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** Array offset: long. */ + protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** Array offset: double. */ + protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** Position. */ + protected int pos; + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** + * Shift position. + * + * @param cnt Byte count. + */ + protected void shift(int cnt) { + pos += cnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java new file mode 100644 index 0000000..3baa6a5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import java.util.*; + +/** + * Portable off-heap input stream. + */ +public final class PortableHeapInputStream extends PortableAbstractInputStream { + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param data Data. + */ + public PortableHeapInputStream(byte[] data) { + this.data = data; + + len = data.length; + } + + /** + * @return Copy of this stream. + */ + public PortableHeapInputStream copy() { + PortableHeapInputStream in = new PortableHeapInputStream(Arrays.copyOf(data, data.length)); + + in.position(pos); + + return in; + } + + /** + * Method called from JNI to resize stream. + * + * @param len Required length. + * @return Underlying byte array. + */ + public byte[] resize(int len) { + if (data.length < len) { + byte[] data0 = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length); + + data = data0; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return data.length - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return data[pos++]; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned(int pos) { + int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java new file mode 100644 index 0000000..f492449 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.*; + +/** + * Portable heap output stream. + */ +public final class PortableHeapOutputStream extends PortableAbstractOutputStream { + /** Default capacity. */ + private static final int DFLT_CAP = 1024; + + /** Allocator. */ + private final PortableMemoryAllocator alloc; + + /** Data. */ + private byte[] data; + + /** + * Constructor. + */ + public PortableHeapOutputStream() { + this(DFLT_CAP, DFLT_ALLOC); + } + + /** + * Constructor. + * + * @param cap Initial capacity. + */ + public PortableHeapOutputStream(int cap) { + this(cap, THREAD_LOCAL_ALLOC); + } + + /** + * Constructor. + * + * @param cap Initial capacity. + * @param alloc Allocator. + */ + public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) { + data = alloc.allocate(cap); + + this.alloc = alloc; + } + + /** + * Constructor. + * + * @param data Data. + */ + public PortableHeapOutputStream(byte[] data) { + this(data, DFLT_ALLOC); + } + + /** + * Constructor. + * + * @param data Data. + * @param alloc Allocator. + */ + public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) { + this.data = data; + this.alloc = alloc; + } + + /** {@inheritDoc} */ + @Override public void close() { + alloc.release(data, pos); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > data.length) { + int newCap = capacity(data.length, cnt); + + data = alloc.reallocate(data, newCap); + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + data[pos++] = val; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long off, int len) { + UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntPositioned(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java new file mode 100644 index 0000000..cd6e039 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable input stream. + */ +public interface PortableInputStream extends PortableStream { + /** + * Read byte value. + * + * @return Byte value. + */ + public byte readByte(); + + /** + * Read byte array. + * + * @param cnt Expected item count. + * @return Byte array. + */ + public byte[] readByteArray(int cnt); + + /** + * Reads {@code cnt} of bytes into byte array. + * + * @param arr Expected item count. + * @param off offset + * @param cnt number of bytes to read. + * @return actual length read. + */ + public int read(byte[] arr, int off, int cnt); + + /** + * Read boolean value. + * + * @return Boolean value. + */ + public boolean readBoolean(); + + /** + * Read boolean array. + * + * @param cnt Expected item count. + * @return Boolean array. + */ + public boolean[] readBooleanArray(int cnt); + + /** + * Read short value. + * + * @return Short value. + */ + public short readShort(); + + /** + * Read short array. + * + * @param cnt Expected item count. + * @return Short array. + */ + public short[] readShortArray(int cnt); + + /** + * Read char value. + * + * @return Char value. + */ + public char readChar(); + + /** + * Read char array. + * + * @param cnt Expected item count. + * @return Char array. + */ + public char[] readCharArray(int cnt); + + /** + * Read int value. + * + * @return Int value. + */ + public int readInt(); + + /** + * Read int value at the given position. + * + * @param pos Position. + * @return Value. + */ + public int readInt(int pos); + + /** + * Read int array. + * + * @param cnt Expected item count. + * @return Int array. + */ + public int[] readIntArray(int cnt); + + /** + * Read float value. + * + * @return Float value. + */ + public float readFloat(); + + /** + * Read float array. + * + * @param cnt Expected item count. + * @return Float array. + */ + public float[] readFloatArray(int cnt); + + /** + * Read long value. + * + * @return Long value. + */ + public long readLong(); + + /** + * Read long array. + * + * @param cnt Expected item count. + * @return Long array. + */ + public long[] readLongArray(int cnt); + + /** + * Read double value. + * + * @return Double value. + */ + public double readDouble(); + + /** + * Read double array. + * + * @param cnt Expected item count. + * @return Double array. + */ + public double[] readDoubleArray(int cnt); + + /** + * Gets amount of remaining data in bytes. + * + * @return Remaining data. + */ + public int remaining(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java new file mode 100644 index 0000000..071396a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable memory allocator. + */ +public interface PortableMemoryAllocator { + /** Default memory allocator. */ + public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator(); + + /** + * Allocate memory. + * + * @param size Size. + * @return Data. + */ + public byte[] allocate(int size); + + /** + * Reallocates memory. + * + * @param data Current data chunk. + * @param size New size required. + * + * @return Data. + */ + public byte[] reallocate(byte[] data, int size); + + /** + * Release memory. + * + * @param data Data. + * @param maxMsgSize Max message size sent during the time the allocator is used. + */ + public void release(byte[] data, int maxMsgSize); + + /** + * Allocate memory. + * + * @param size Size. + * @return Address. + */ + public long allocateDirect(int size); + + /** + * Reallocate memory. + * + * @param addr Address. + * @param size Size. + * @return Address. + */ + public long reallocateDirect(long addr, int size); + + /** + * Release memory. + * + * @param addr Address. + */ + public void releaseDirect(long addr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java new file mode 100644 index 0000000..bfdd97a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable off-heap input stream. + */ +public class PortableOffheapInputStream extends PortableAbstractInputStream { + /** Pointer. */ + private final long ptr; + + /** Capacity. */ + private final int cap; + + /** */ + private boolean forceHeap; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + */ + public PortableOffheapInputStream(long ptr, int cap) { + this(ptr, cap, false); + } + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will + * create heap-based objects. + */ + public PortableOffheapInputStream(long ptr, int cap, boolean forceHeap) { + this.ptr = ptr; + this.cap = cap; + this.forceHeap = forceHeap; + + len = cap; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return cap - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return UNSAFE.getByte(ptr + pos++); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(null, ptr + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned(int pos) { + int res = UNSAFE.getInt(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return forceHeap ? 0 : ptr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java new file mode 100644 index 0000000..adfb6bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable offheap output stream. + */ +public class PortableOffheapOutputStream extends PortableAbstractOutputStream { + /** Pointer. */ + private long ptr; + + /** Length of bytes that cen be used before resize is necessary. */ + private int cap; + + /** + * Constructor. + * + * @param cap Capacity. + */ + public PortableOffheapOutputStream(int cap) { + this(0, cap); + } + + /** + * Constructor. + * + * @param ptr Pointer to existing address. + * @param cap Capacity. + */ + public PortableOffheapOutputStream(long ptr, int cap) { + this.ptr = ptr == 0 ? allocate(cap) : ptr; + + this.cap = cap; + } + + /** {@inheritDoc} */ + @Override public void close() { + release(ptr); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > cap) { + int newCap = capacity(cap, cnt); + + ptr = reallocate(ptr, newCap); + + cap = newCap; + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** + * @return Pointer. + */ + public long pointer() { + return ptr; + } + + /** + * @return Capacity. + */ + public int capacity() { + return cap; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long offset, int len) { + UNSAFE.copyMemory(src, offset, null, ptr + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntPositioned(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** + * Allocate memory. + * + * @param cap Capacity. + * @return Pointer. + */ + protected long allocate(int cap) { + return UNSAFE.allocateMemory(cap); + } + + /** + * Reallocate memory. + * + * @param ptr Old pointer. + * @param cap Capacity. + * @return New pointer. + */ + protected long reallocate(long ptr, int cap) { + return UNSAFE.reallocateMemory(ptr, cap); + } + + /** + * Release memory. + * + * @param ptr Pointer. + */ + protected void release(long ptr) { + UNSAFE.freeMemory(ptr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java new file mode 100644 index 0000000..f320566 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable output stream. + */ +public interface PortableOutputStream extends PortableStream, AutoCloseable { + /** + * Write byte value. + * + * @param val Byte value. + */ + public void writeByte(byte val); + + /** + * Write byte array. + * + * @param val Byte array. + */ + public void writeByteArray(byte[] val); + + /** + * Write boolean value. + * + * @param val Boolean value. + */ + public void writeBoolean(boolean val); + + /** + * Write boolean array. + * + * @param val Boolean array. + */ + public void writeBooleanArray(boolean[] val); + + /** + * Write short value. + * + * @param val Short value. + */ + public void writeShort(short val); + + /** + * Write short array. + * + * @param val Short array. + */ + public void writeShortArray(short[] val); + + /** + * Write char value. + * + * @param val Char value. + */ + public void writeChar(char val); + + /** + * Write char array. + * + * @param val Char array. + */ + public void writeCharArray(char[] val); + + /** + * Write int value. + * + * @param val Int value. + */ + public void writeInt(int val); + + /** + * Write int value to the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeInt(int pos, int val); + + /** + * Write int array. + * + * @param val Int array. + */ + public void writeIntArray(int[] val); + + /** + * Write float value. + * + * @param val Float value. + */ + public void writeFloat(float val); + + /** + * Write float array. + * + * @param val Float array. + */ + public void writeFloatArray(float[] val); + + /** + * Write long value. + * + * @param val Long value. + */ + public void writeLong(long val); + + /** + * Write long array. + * + * @param val Long array. + */ + public void writeLongArray(long[] val); + + /** + * Write double value. + * + * @param val Double value. + */ + public void writeDouble(double val); + + /** + * Write double array. + * + * @param val Double array. + */ + public void writeDoubleArray(double[] val); + + /** + * Write byte array. + * + * @param arr Array. + * @param off Offset. + * @param len Length. + */ + public void write(byte[] arr, int off, int len); + + /** + * Write data from unmanaged memory. + * + * @param addr Address. + * @param cnt Count. + */ + public void write(long addr, int cnt); + + /** + * Close the stream releasing resources. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java new file mode 100644 index 0000000..6021140 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.util.*; + +import sun.misc.*; + +/** + * Naive implementation of portable memory allocator. + */ +public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator { + /** Unsafe. */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** {@inheritDoc} */ + @Override public byte[] allocate(int size) { + return new byte[size]; + } + + /** {@inheritDoc} */ + @Override public byte[] reallocate(byte[] data, int size) { + byte[] newData = new byte[size]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); + + return newData; + } + + /** {@inheritDoc} */ + @Override public void release(byte[] data, int maxMsgSize) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long allocateDirect(int size) { + return UNSAFE.allocateMemory(size); + } + + /** {@inheritDoc} */ + @Override public long reallocateDirect(long addr, int size) { + return UNSAFE.reallocateMemory(addr, size); + } + + /** {@inheritDoc} */ + @Override public void releaseDirect(long addr) { + UNSAFE.freeMemory(addr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java new file mode 100644 index 0000000..5c84609 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable stream. + */ +public interface PortableStream { + /** + * @return Position. + */ + public int position(); + + /** + * @param pos Position. + */ + public void position(int pos); + + /** + * @return Underlying array. + */ + public byte[] array(); + + /** + * @return Copy of data in the stream. + */ + public byte[] arrayCopy(); + + /** + * @return Offheap pointer if stream is offheap based, otherwise {@code 0}. + */ + public long offheapPointer(); + + /** + * @return {@code True} is stream is array based. + */ + public boolean hasArray(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java index 10deaf2..eed2cc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java @@ -61,7 +61,7 @@ public class CacheObjectPortableContext extends CacheObjectContext { if (o == null) return null; - if (keepPortable || !portableEnabled() || !GridPortableUtils.isPortableOrCollectionType(o.getClass())) + if (keepPortable || !portableEnabled() || !PortableUtils.isPortableOrCollectionType(o.getClass())) return o; return unwrapPortable(o); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java index 15b2f00..cf5106a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cacheobject.*; -import org.apache.ignite.internal.processors.portable.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; @@ -86,7 +85,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor }; /** */ - private GridPortableContext portableCtx; + private PortableContext portableCtx; /** */ private Marshaller marsh; @@ -99,7 +98,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor private IgnitePortables portables; /** Metadata updates collected before metadata cache is initialized. */ - private final Map<Integer, GridPortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>(); + private final Map<Integer, PortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>(); /** */ private UUID metaCacheQryId; @@ -202,18 +201,18 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { if (marsh instanceof PortableMarshaller) { - GridPortableMetaDataHandler metaHnd = new GridPortableMetaDataHandler() { - @Override public void addMeta(int typeId, GridPortableMetaDataImpl newMeta) + PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() { + @Override public void addMeta(int typeId, PortableMetaDataImpl newMeta) throws PortableException { if (metaDataCache == null) { - GridPortableMetaDataImpl oldMeta = metaBuf.get(typeId); + PortableMetaDataImpl oldMeta = metaBuf.get(typeId); if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) { synchronized (this) { Map<String, String> fields = new HashMap<>(); if (checkMeta(typeId, oldMeta, newMeta, fields)) { - newMeta = new GridPortableMetaDataImpl(newMeta.typeName(), + newMeta = new PortableMetaDataImpl(newMeta.typeName(), fields, newMeta.affinityKeyFieldName()); @@ -243,7 +242,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor PortableMarshaller pMarh0 = (PortableMarshaller)marsh; - portableCtx = new GridPortableContext(metaHnd, ctx.gridName()); + portableCtx = new PortableContext(metaHnd, ctx.gridName()); IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx); @@ -308,7 +307,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor startLatch.countDown(); - for (Map.Entry<Integer, GridPortableMetaDataImpl> e : metaBuf.entrySet()) + for (Map.Entry<Integer, PortableMetaDataImpl> e : metaBuf.entrySet()) addMeta(e.getKey(), e.getValue()); metaBuf.clear(); @@ -381,7 +380,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor if (type != CacheObject.TYPE_BYTE_ARR) { assert size > 0 : size; - GridPortableInputStream in = new GridPortableOffheapInputStream(ptr, size, forceHeap); + PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap); return portableMarsh.unmarshal(in); } @@ -394,7 +393,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor if (obj == null) return null; - if (GridPortableUtils.isPortableType(obj.getClass())) + if (PortableUtils.isPortableType(obj.getClass())) return obj; if (obj instanceof Object[]) { @@ -444,7 +443,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor assert obj0 instanceof PortableObject; - ((GridPortableObjectImpl)obj0).detachAllowed(true); + ((PortableObjectImpl)obj0).detachAllowed(true); return obj0; } @@ -458,24 +457,24 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor /** {@inheritDoc} */ @Override public PortableBuilder builder(int typeId) { - return new GridPortableBuilderImpl(portableCtx, typeId); + return new PortableBuilderImpl(portableCtx, typeId); } /** {@inheritDoc} */ @Override public PortableBuilder builder(String clsName) { - return new GridPortableBuilderImpl(portableCtx, clsName); + return new PortableBuilderImpl(portableCtx, clsName); } /** {@inheritDoc} */ @Override public PortableBuilder builder(PortableObject portableObj) { - return GridPortableBuilderImpl.wrap(portableObj); + return PortableBuilderImpl.wrap(portableObj); } /** {@inheritDoc} */ @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName, Map<String, Integer> fieldTypeIds) throws PortableException { portableCtx.updateMetaData(typeId, - new GridPortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName)); + new PortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName)); } /** {@inheritDoc} */ @@ -613,7 +612,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor /** * @return Portable context. */ - public GridPortableContext portableContext() { + public PortableContext portableContext() { return portableCtx; } @@ -670,7 +669,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor obj = toPortable(obj); if (obj instanceof PortableObject) - return (GridPortableObjectImpl)obj; + return (PortableObjectImpl)obj; } return toCacheKeyObject0(obj, userObj); @@ -687,15 +686,15 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor obj = toPortable(obj); if (obj instanceof PortableObject) - return (GridPortableObjectImpl)obj; + return (PortableObjectImpl)obj; return toCacheObject0(obj, userObj); } /** {@inheritDoc} */ @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { - if (type == GridPortableObjectImpl.TYPE_PORTABLE) - return new GridPortableObjectImpl(portableContext(), bytes, 0); + if (type == PortableObjectImpl.TYPE_PORTABLE) + return new PortableObjectImpl(portableContext(), bytes, 0); return super.toCacheObject(ctx, type, bytes); } @@ -708,8 +707,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor Object val = unmarshal(valPtr, !tmp); - if (val instanceof GridPortableObjectOffheapImpl) - return (GridPortableObjectOffheapImpl)val; + if (val instanceof PortableObjectOffheapImpl) + return (PortableObjectOffheapImpl)val; return new CacheObjectImpl(val, null); } @@ -719,8 +718,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled()) return obj; - if (obj instanceof GridPortableObjectOffheapImpl) - return ((GridPortableObjectOffheapImpl)obj).heapCopy(); + if (obj instanceof PortableObjectOffheapImpl) + return ((PortableObjectOffheapImpl)obj).heapCopy(); return obj; } @@ -752,8 +751,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor PortableMetadata newMeta, @Nullable Map<String, String> fields) throws PortableException { assert newMeta != null; - Map<String, String> oldFields = oldMeta != null ? ((GridPortableMetaDataImpl)oldMeta).fieldsMeta() : null; - Map<String, String> newFields = ((GridPortableMetaDataImpl)newMeta).fieldsMeta(); + Map<String, String> oldFields = oldMeta != null ? ((PortableMetaDataImpl)oldMeta).fieldsMeta() : null; + Map<String, String> newFields = ((PortableMetaDataImpl)newMeta).fieldsMeta(); boolean changed = false; @@ -851,7 +850,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor Map<String, String> fields = new HashMap<>(); if (checkMeta(typeId, oldMeta, newMeta, fields)) { - PortableMetadata res = new GridPortableMetaDataImpl(newMeta.typeName(), + PortableMetadata res = new PortableMetaDataImpl(newMeta.typeName(), fields, newMeta.affinityKeyFieldName());