http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index d6aef65..1932258 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -397,7 +397,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); - CacheObject val = cctx.unswapCacheObject(e.type(), e.valueBytes(), e.valueClassLoaderId()); + CacheObject val = cctx.unswapCacheObject(e.type(), ByteBuffer.wrap(e.valueBytes()), e.valueClassLoaderId()); if (val == null) return null; @@ -422,15 +422,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First check off-heap store. if (offheapEnabled) - if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()))) + if (offheap.contains(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext())))) return true; if (swapEnabled) { assert key != null; - byte[] valBytes = swapMgr.read(spaceName, - new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), - cctx.deploy().globalLoader()); + byte[] keyBytes = U.toArray(key.valueBytes(cctx.cacheObjectContext())); + + SwapKey swapKey = new SwapKey(key.value(cctx.cacheObjectContext(), false), part, keyBytes); + + byte[] valBytes = swapMgr.read(spaceName, swapKey, cctx.deploy().globalLoader()); return valBytes != null; } @@ -518,7 +520,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try removing from offheap. if (offheapEnabled) { - byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + byte[] entryBytes = offheap.remove(spaceName, part, key, + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -577,7 +580,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { SwapKey swapKey = new SwapKey(key.value(cctx.cacheObjectContext(), false), part, - key.valueBytes(cctx.cacheObjectContext())); + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() { @Override public void apply(byte[] rmv) { @@ -652,7 +655,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return null; return read(entry.key(), - entry.key().valueBytes(cctx.cacheObjectContext()), + U.toArray(entry.key().valueBytes(cctx.cacheObjectContext())), entry.partition(), locked, readOffheap, @@ -673,7 +676,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); IgniteBiTuple<Long, Integer> ptr = - offheap.valuePointer(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + offheap.valuePointer(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); if (ptr != null) { assert ptr.get1() != null; @@ -702,7 +705,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap); + return read(key, U.toArray(key.valueBytes(cctx.cacheObjectContext())), part, false, readOffheap, readSwap); } /** @@ -740,7 +743,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); byte[] entryBytes = - offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + offheap.remove(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -781,7 +784,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { SwapKey swapKey = new SwapKey(key.value(cctx.cacheObjectContext(), false), cctx.affinity().partition(key), - key.valueBytes(cctx.cacheObjectContext())); + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); unprocessedKeys.add(swapKey); } @@ -796,7 +799,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (KeyCacheObject key : keys) { SwapKey swapKey = new SwapKey(key.value(cctx.cacheObjectContext(), false), cctx.affinity().partition(key), - key.valueBytes(cctx.cacheObjectContext())); + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); unprocessedKeys.add(swapKey); } @@ -887,7 +890,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + return offheap.removex(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); } /** @@ -911,7 +914,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.enableEviction(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + offheap.enableEviction(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); } /** @@ -954,7 +957,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false), - key.valueBytes(cctx.cacheObjectContext())); + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); if (val != null) { if (c != null) @@ -967,7 +970,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (swapEnabled) { SwapKey swapKey = new SwapKey(key.value(cctx.cacheObjectContext(), false), part, - key.valueBytes(cctx.cacheObjectContext())); + U.toArray(key.valueBytes(cctx.cacheObjectContext()))); swapMgr.remove(spaceName, swapKey, @@ -1014,7 +1017,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { valClsLdrId); if (offheapEnabled) { - offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal()); + offheap.put(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext())), entry.marshal()); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, @@ -1047,7 +1050,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { offheap.put(spaceName, swapEntry.partition(), swapEntry.key(), - swapEntry.key().valueBytes(cctx.cacheObjectContext()), + U.toArray(swapEntry.key().valueBytes(cctx.cacheObjectContext())), swapEntry.marshal()); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) @@ -1064,7 +1067,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (GridCacheBatchSwapEntry entry : swapped) { SwapKey swapKey = new SwapKey(entry.key().value(cctx.cacheObjectContext(), false), entry.partition(), - entry.key().valueBytes(cctx.cacheObjectContext())); + U.toArray(entry.key().valueBytes(cctx.cacheObjectContext()))); batch.put(swapKey, entry.marshal()); } @@ -1099,7 +1102,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); swapMgr.write(spaceName, - new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), + new SwapKey(key.value(cctx.cacheObjectContext(), false), part, + U.toArray(key.valueBytes(cctx.cacheObjectContext()))), entry, cctx.deploy().globalLoader()); @@ -1283,7 +1287,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + offheap.removex(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); } else it.removeX(); @@ -1407,7 +1411,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cur = new Map.Entry<K, V>() { @Override public K getKey() { try { - KeyCacheObject key = cctx.toCacheKeyObject(cur0.getKey()); + KeyCacheObject key = cctx.toCacheKeyObject(ByteBuffer.wrap(cur0.getKey())); return key.value(cctx.cacheObjectContext(), false); } @@ -1501,7 +1505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected KeyCacheObject onNext() { try { - cur = cctx.toCacheKeyObject(it.next().getKey()); + cur = cctx.toCacheKeyObject(ByteBuffer.wrap(it.next().getKey())); return cur; } @@ -1601,7 +1605,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + offheap.removex(spaceName, part, key, U.toArray(key.valueBytes(cctx.cacheObjectContext()))); } @Override protected void onClose() throws IgniteCheckedException { @@ -1734,7 +1738,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { swapEntry.type() != CacheObject.TYPE_BYTE_ARR) { // We need value here only for classloading purposes. Object val = cctx.cacheObjects().unmarshal(cctx.cacheObjectContext(), - swapEntry.valueBytes(), + ByteBuffer.wrap(swapEntry.valueBytes()), cctx.deploy().globalLoader()); if (val != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 207a2f1..bc2f5f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -41,6 +41,7 @@ import javax.cache.*; import javax.cache.expiry.*; import javax.cache.integration.*; import java.io.*; +import java.nio.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -1198,7 +1199,7 @@ public class GridCacheUtils { * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("unchecked") - public static byte[] marshal(GridCacheSharedContext ctx, Object obj) + public static ByteBuffer marshal(GridCacheSharedContext ctx, Object obj) throws IgniteCheckedException { assert ctx != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 429b30c..94ed0ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.jetbrains.annotations.*; +import java.nio.*; + /** * */ @@ -38,7 +40,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb * @param val Value. * @param valBytes Value bytes. */ - public KeyCacheObjectImpl(Object val, byte[] valBytes) { + public KeyCacheObjectImpl(Object val, ByteBuffer valBytes) { assert val != null; this.val = val; @@ -55,7 +57,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ - @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + @Override public ByteBuffer valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = ctx.processor().marshal(ctx, val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java index 762455c..ad22c6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java @@ -53,7 +53,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem /** */ @GridToStringExclude - private byte[] candsByIdxBytes; + private ByteBuffer candsByIdxBytes; /** Collections of local lock candidates. */ @GridToStringInclude @@ -62,7 +62,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem /** Collections of local lock candidates in serialized form. */ @GridToStringExclude - private byte[] candsByKeyBytes; + private ByteBuffer candsByKeyBytes; /** Committed versions with order higher than one for this message (needed for commit ordering). */ @GridToStringInclude @@ -257,14 +257,14 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem switch (writer.state()) { case 3: - if (!writer.writeByteArray("candsByIdxBytes", candsByIdxBytes)) - return false; +// if (!writer.writeByteArray("candsByIdxBytes", candsByIdxBytes)) +// return false; writer.incrementState(); case 4: - if (!writer.writeByteArray("candsByKeyBytes", candsByKeyBytes)) - return false; +// if (!writer.writeByteArray("candsByKeyBytes", candsByKeyBytes)) +// return false; writer.incrementState(); @@ -303,7 +303,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem switch (reader.state()) { case 3: - candsByIdxBytes = reader.readByteArray("candsByIdxBytes"); +// candsByIdxBytes = reader.readByteArray("candsByIdxBytes"); if (!reader.isLastRead()) return false; @@ -311,7 +311,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem reader.incrementState(); case 4: - candsByKeyBytes = reader.readByteArray("candsByKeyBytes"); +// candsByKeyBytes = reader.readByteArray("candsByKeyBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index b5e8d61..2869fe5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -47,7 +47,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { private Throwable err; /** Serialized error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** Values. */ @GridToStringInclude @@ -244,8 +244,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { switch (writer.state()) { case 8: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -278,7 +278,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { switch (reader.state()) { case 8: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index ec02e6e..b3d8d4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -95,7 +95,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** Group lock key bytes. */ @GridToStringExclude - private byte[] grpLockKeyBytes; + private ByteBuffer grpLockKeyBytes; /** Partition lock flag. */ private boolean partLock; @@ -108,7 +108,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage private Map<UUID, Collection<UUID>> txNodes; /** */ - private byte[] txNodesBytes; + private ByteBuffer txNodesBytes; /** One phase commit flag. */ private boolean onePhaseCommit; @@ -397,8 +397,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 11: - if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes)) - return false; +// if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes)) +// return false; writer.incrementState(); @@ -457,8 +457,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 21: - if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) - return false; +// if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) +// return false; writer.incrementState(); @@ -525,7 +525,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 11: - grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes"); +// grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes"); if (!reader.isLastRead()) return false; @@ -613,7 +613,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 21: - txNodesBytes = reader.readByteArray("txNodesBytes"); +// txNodesBytes = reader.readByteArray("txNodesBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index adece8c..67fcb47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -43,7 +43,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage private Map<KeyCacheObject, Collection<GridCacheMvccCandidate>> cands; /** */ - private byte[] candsBytes; + private ByteBuffer candsBytes; /** Error. */ @GridToStringExclude @@ -51,7 +51,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage private Throwable err; /** Serialized error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** * Empty constructor (required by {@link Externalizable}). @@ -164,14 +164,14 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage switch (writer.state()) { case 8: - if (!writer.writeByteArray("candsBytes", candsBytes)) - return false; +// if (!writer.writeByteArray("candsBytes", candsBytes)) +// return false; writer.incrementState(); case 9: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -192,7 +192,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage switch (reader.state()) { case 8: - candsBytes = reader.readByteArray("candsBytes"); +// candsBytes = reader.readByteArray("candsBytes"); if (!reader.isLastRead()) return false; @@ -200,7 +200,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage reader.incrementState(); case 9: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 0c4835f..9802421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -44,7 +44,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { private List<List<ClusterNode>> affAssignment; /** Affinity assignment bytes. */ - private byte[] affAssignmentBytes; + private ByteBuffer affAssignmentBytes; /** * Empty constructor. @@ -127,8 +127,8 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { switch (writer.state()) { case 3: - if (!writer.writeByteArray("affAssignmentBytes", affAssignmentBytes)) - return false; +// if (!writer.writeByteArray("affAssignmentBytes", affAssignmentBytes)) +// return false; writer.incrementState(); @@ -155,7 +155,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { switch (reader.state()) { case 3: - affAssignmentBytes = reader.readByteArray("affAssignmentBytes"); +// affAssignmentBytes = reader.readByteArray("affAssignmentBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 8a48371..3182b91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -59,7 +59,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { private Map<KeyCacheObject, GridCacheVersion> owned; /** Owner mapped version bytes. */ - private byte[] ownedBytes; + private ByteBuffer ownedBytes; /** Topology version. */ private long topVer; @@ -352,8 +352,8 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 26: - if (!writer.writeByteArray("ownedBytes", ownedBytes)) - return false; +// if (!writer.writeByteArray("ownedBytes", ownedBytes)) +// return false; writer.incrementState(); @@ -430,7 +430,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 26: - ownedBytes = reader.readByteArray("ownedBytes"); +// ownedBytes = reader.readByteArray("ownedBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index ddf0d55..86e9fa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -102,23 +102,23 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid private List<EntryProcessor<Object, Object, Object>> entryProcessors; /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> entryProcessorsBytes; + @GridDirectCollection(ByteBuffer.class) + private List<ByteBuffer> entryProcessorsBytes; /** Near entry processors. */ @GridDirectTransient private List<EntryProcessor<Object, Object, Object>> nearEntryProcessors; /** Near entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> nearEntryProcessorsBytes; + @GridDirectCollection(ByteBuffer.class) + private List<ByteBuffer> nearEntryProcessorsBytes; /** Optional arguments for entry processor. */ @GridDirectTransient private Object[] invokeArgs; /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; + private ByteBuffer[] invokeArgsBytes; /** Subject ID. */ private UUID subjId; @@ -758,7 +758,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 8: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); +// invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index d0a7620..4f8d759 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -52,7 +52,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri private IgniteCheckedException err; /** Serialized update error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** Evicted readers. */ @GridToStringInclude @@ -181,8 +181,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri switch (writer.state()) { case 3: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -221,7 +221,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri switch (reader.state()) { case 3: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index b72a8e6..4b90ee9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -83,15 +83,15 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri private List<EntryProcessor<Object, Object, Object>> entryProcessors; /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List<byte[]> entryProcessorsBytes; + @GridDirectCollection(ByteBuffer.class) + private List<ByteBuffer> entryProcessorsBytes; /** Optional arguments for entry processor. */ @GridDirectTransient private Object[] invokeArgs; /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; + private ByteBuffer[] invokeArgsBytes; /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) @@ -111,7 +111,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri private ExpiryPolicy expiryPlc; /** Expiry policy bytes. */ - private byte[] expiryPlcBytes; + private ByteBuffer expiryPlcBytes; /** Filter. */ private CacheEntryPredicate[] filter; @@ -592,8 +592,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 7: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) - return false; +// if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) +// return false; writer.incrementState(); @@ -736,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 7: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); +// expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -784,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 13: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); +// invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 01d5722..1d51cc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -54,7 +54,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr private volatile IgniteCheckedException err; /** Serialized error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** Return value. */ @GridToStringInclude @@ -421,8 +421,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr switch (writer.state()) { case 3: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -503,7 +503,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr switch (reader.state()) { case 3: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 2b35c6e..13d3a21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -48,7 +48,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { private Object topic; /** Serialized topic. */ - private byte[] topicBytes; + private ByteBuffer topicBytes; /** Timeout. */ private long timeout; @@ -223,8 +223,8 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { writer.incrementState(); case 6: - if (!writer.writeByteArray("topicBytes", topicBytes)) - return false; +// if (!writer.writeByteArray("topicBytes", topicBytes)) +// return false; writer.incrementState(); @@ -281,7 +281,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 6: - topicBytes = reader.readByteArray("topicBytes"); +// topicBytes = reader.readByteArray("topicBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 7b1786d..058298e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -33,6 +33,7 @@ import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import java.io.*; +import java.nio.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; @@ -364,7 +365,7 @@ class GridDhtPartitionSupplyPool<K, V> { GridCacheEntryInfo info = new GridCacheEntryInfo(); - info.keyBytes(e.getKey()); + info.keyBytes(ByteBuffer.wrap(e.getKey())); info.ttl(swapEntry.ttl()); info.expireTime(swapEntry.expireTime()); info.version(swapEntry.version()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 056ddcc..7008b80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -43,7 +43,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>(); /** */ - private byte[] partsBytes; + private ByteBuffer partsBytes; /** Topology version. */ private long topVer; @@ -131,8 +131,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 5: - if (!writer.writeByteArray("partsBytes", partsBytes)) - return false; +// if (!writer.writeByteArray("partsBytes", partsBytes)) +// return false; writer.incrementState(); @@ -159,7 +159,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 5: - partsBytes = reader.readByteArray("partsBytes"); +// partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 66140cd..851ffc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -43,7 +43,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes private Map<Integer, GridDhtPartitionMap> parts = new HashMap<>(); /** Serialized partitions. */ - private byte[] partsBytes; + private ByteBuffer partsBytes; /** * Required by {@link Externalizable}. @@ -110,8 +110,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (writer.state()) { case 5: - if (!writer.writeByteArray("partsBytes", partsBytes)) - return false; +// if (!writer.writeByteArray("partsBytes", partsBytes)) +// return false; writer.incrementState(); @@ -132,7 +132,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (reader.state()) { case 5: - partsBytes = reader.readByteArray("partsBytes"); +// partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 5d07930..b25f4bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -66,7 +66,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe private Throwable err; /** Serialized error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** * Empty constructor required for {@link Externalizable}. @@ -220,8 +220,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe writer.incrementState(); case 4: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -280,7 +280,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe reader.incrementState(); case 4: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java index 68e23ce..0c53a09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java @@ -42,7 +42,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { private Throwable err; /** Serialized error. */ - private byte[] errBytes; + private ByteBuffer errBytes; /** Mini future ID. */ private IgniteUuid miniId; @@ -129,8 +129,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { switch (writer.state()) { case 5: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -163,7 +163,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { switch (reader.state()) { case 5: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index da75742..4fbfe34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -45,6 +45,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.expiry.*; import java.io.*; +import java.nio.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; @@ -311,15 +312,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Writes key-value pair to index. * * @param key Key. - * @param keyBytes Byte array with key data. * @param val Value. * @param valBytes Value bytes. * @param ver Cache entry version. * @param expirationTime Expiration time or 0 if never expires. * @throws IgniteCheckedException In case of error. */ - public void store(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - GridCacheVersion ver, long expirationTime) + public void store(K key, @Nullable V val, @Nullable ByteBuffer valBytes, GridCacheVersion ver, long expirationTime) throws IgniteCheckedException { assert key != null; assert val != null || valBytes != null; @@ -334,7 +333,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (val == null) val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); - qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); + qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime); } finally { invalidateResultCache(); @@ -693,19 +692,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (data == null) data = Collections.emptyList(); - final GridIterator<IgniteBiTuple<K, V>> it = F.iterator( - data, - new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() { + final GridIterator<IgniteBiTuple<K, V>> it = F.iterator(data, new C1<GridCacheSetItemKey, IgniteBiTuple<K, + V>>() { @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey e) { return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE); } - }, - true, - new P1<GridCacheSetItemKey>() { + }, true, new P1<GridCacheSetItemKey>() { @Override public boolean apply(GridCacheSetItemKey e) { return filter.apply(e, null); } - }); + } + ); return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { @Override protected boolean onHasNext() { @@ -2261,7 +2258,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @return Key bytes. */ - protected abstract byte[] keyBytes(); + protected abstract ByteBuffer keyBytes(); /** * @return Value. @@ -2334,8 +2331,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override protected byte[] keyBytes() { - return e.getKey(); + @Override protected ByteBuffer keyBytes() { + return ByteBuffer.wrap(e.getKey()); } /** {@inheritDoc} */ @@ -2343,7 +2340,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected V unmarshalValue() throws IgniteCheckedException { IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue()); - CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); + CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), + t.get2(), ByteBuffer.wrap(t.get1())); return obj.value(cctx.cacheObjectContext(), false); } @@ -2387,8 +2385,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ - @Override protected byte[] keyBytes() { - return U.copyMemory(keyPtr.get1(), keyPtr.get2()); + @Override protected ByteBuffer keyBytes() { + return ByteBuffer.wrap(U.copyMemory(keyPtr.get1(), keyPtr.get2())); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 845077f..03fa54e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -62,28 +62,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache private IgniteBiPredicate<Object, Object> keyValFilter; /** */ - private byte[] keyValFilterBytes; + private ByteBuffer keyValFilterBytes; /** */ @GridDirectTransient private IgniteReducer<Object, Object> rdc; /** */ - private byte[] rdcBytes; + private ByteBuffer rdcBytes; /** */ @GridDirectTransient private IgniteClosure<Object, Object> trans; /** */ - private byte[] transBytes; + private ByteBuffer transBytes; /** */ @GridDirectTransient private Object[] args; /** */ - private byte[] argsBytes; + private ByteBuffer argsBytes; /** */ private int pageSize; @@ -436,8 +436,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 4: - if (!writer.writeByteArray("argsBytes", argsBytes)) - return false; +// if (!writer.writeByteArray("argsBytes", argsBytes)) +// return false; writer.incrementState(); @@ -496,8 +496,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 14: - if (!writer.writeByteArray("keyValFilterBytes", keyValFilterBytes)) - return false; +// if (!writer.writeByteArray("keyValFilterBytes", keyValFilterBytes)) +// return false; writer.incrementState(); @@ -508,8 +508,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 16: - if (!writer.writeByteArray("rdcBytes", rdcBytes)) - return false; +// if (!writer.writeByteArray("rdcBytes", rdcBytes)) +// return false; writer.incrementState(); @@ -526,8 +526,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 19: - if (!writer.writeByteArray("transBytes", transBytes)) - return false; +// if (!writer.writeByteArray("transBytes", transBytes)) +// return false; writer.incrementState(); @@ -562,7 +562,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 4: - argsBytes = reader.readByteArray("argsBytes"); +// argsBytes = reader.readByteArray("argsBytes"); if (!reader.isLastRead()) return false; @@ -642,7 +642,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 14: - keyValFilterBytes = reader.readByteArray("keyValFilterBytes"); +// keyValFilterBytes = reader.readByteArray("keyValFilterBytes"); if (!reader.isLastRead()) return false; @@ -658,7 +658,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 16: - rdcBytes = reader.readByteArray("rdcBytes"); +// rdcBytes = reader.readByteArray("rdcBytes"); if (!reader.isLastRead()) return false; @@ -682,7 +682,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 19: - transBytes = reader.readByteArray("transBytes"); +// transBytes = reader.readByteArray("transBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index 1988cff..82612f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -49,14 +49,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach private Throwable err; /** */ - private byte[] errBytes; + private ByteBuffer errBytes; /** */ private boolean fields; /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> metaDataBytes; + @GridDirectCollection(ByteBuffer.class) + private Collection<ByteBuffer> metaDataBytes; /** */ @GridToStringInclude @@ -64,8 +64,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach private List<GridQueryFieldMetadata> metadata; /** */ - @GridDirectCollection(byte[].class) - private Collection<byte[]> dataBytes; + @GridDirectCollection(ByteBuffer.class) + private Collection<ByteBuffer> dataBytes; /** */ @GridDirectTransient @@ -223,8 +223,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach writer.incrementState(); case 4: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; +// if (!writer.writeByteArray("errBytes", errBytes)) +// return false; writer.incrementState(); @@ -277,7 +277,7 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach reader.incrementState(); case 4: - errBytes = reader.readByteArray("errBytes"); +// errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 1cd549a..48c96c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -32,6 +32,7 @@ import org.jetbrains.annotations.*; import javax.cache.event.*; import javax.cache.event.EventType; import java.io.*; +import java.nio.*; import java.util.*; import static org.apache.ignite.events.EventType.*; @@ -435,7 +436,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private static final long serialVersionUID = 0L; /** Serialized object. */ - private byte[] bytes; + private ByteBuffer bytes; /** Deployment class name. */ private String clsName; @@ -493,14 +494,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, bytes); + U.writeByteBuffer(out, bytes); U.writeString(out, clsName); out.writeObject(depInfo); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - bytes = U.readByteArray(in); + bytes = U.readByteBuffer(in); clsName = U.readString(in); depInfo = (GridDeploymentInfo)in.readObject(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java index 43dc81f..c2a63a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java @@ -132,7 +132,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b status = 0; - data = MARSHALLER.marshal(F.asList(schemasMap, indexesInfo)); + data = U.toArray(MARSHALLER.marshal(F.asList(schemasMap, indexesInfo))); } catch (Throwable t) { U.error(log, "Failed to get metadata for JDBC.", t); @@ -142,7 +142,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b status = 1; try { - data = MARSHALLER.marshal(err); + data = U.toArray(MARSHALLER.marshal(err)); } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index 02fe066..817e710 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -33,6 +33,7 @@ import org.apache.ignite.resources.*; import java.math.*; import java.net.*; +import java.nio.*; import java.sql.*; import java.util.*; import java.util.Date; @@ -62,7 +63,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { try { assert arg != null; - Map<String, Object> args = MARSHALLER.unmarshal(arg, null); + Map<String, Object> args = MARSHALLER.unmarshal(ByteBuffer.wrap(arg), null); boolean first = true; @@ -107,12 +108,12 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { if (res.getException() == null) { status = 0; - bytes = MARSHALLER.marshal(res.getData()); + bytes = U.toArray(MARSHALLER.marshal(res.getData())); } else { status = 1; - bytes = MARSHALLER.marshal(new SQLException(res.getException().getMessage())); + bytes = U.toArray(MARSHALLER.marshal(new SQLException(res.getException().getMessage()))); } byte[] packet = new byte[bytes.length + 1]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 95d3527..fb9e32a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -80,7 +80,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Transform closure bytes. */ @GridToStringExclude - private byte[] transformClosBytes; + private ByteBuffer transformClosBytes; /** Time to live. */ private long ttl; @@ -148,7 +148,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private boolean transferExpiryPlc; /** Expiry policy bytes. */ - private byte[] expiryPlcBytes; + private ByteBuffer expiryPlcBytes; /** * Required by {@link Externalizable} @@ -794,8 +794,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 3: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) - return false; +// if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) +// return false; writer.incrementState(); @@ -825,8 +825,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) - return false; +// if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) +// return false; writer.incrementState(); @@ -880,7 +880,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 3: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); +// expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -920,7 +920,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); +// transformClosBytes = reader.readByteArray("transformClosBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index 500e466..57371d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -39,10 +39,10 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl /** Key bytes. */ @GridDirectTransient - private byte[] keyBytes; + private ByteBuffer keyBytes; /** Value bytes. */ - private byte[] valBytes; + private ByteBuffer valBytes; /** TTL. */ private long ttl; @@ -93,8 +93,9 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl * @param ttl TTL. * @param ver Version. */ - public GridCacheRawVersionedEntry(byte[] keyBytes, - byte[] valBytes, + public GridCacheRawVersionedEntry( + ByteBuffer keyBytes, + ByteBuffer valBytes, long ttl, long expireTime, GridCacheVersion ver) { @@ -122,7 +123,7 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl /** * @return Key bytes. */ - public byte[] keyBytes() { + public ByteBuffer keyBytes() { return keyBytes; } @@ -134,7 +135,7 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl /** * @return Value bytes. */ - public byte[] valueBytes() { + public ByteBuffer valueBytes() { return valBytes; } @@ -287,7 +288,7 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl reader.incrementState(); case 4: - valBytes = reader.readByteArray("valBytes"); +// valBytes = reader.readByteArray("valBytes"); if (!reader.isLastRead()) return false; @@ -341,8 +342,8 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl writer.incrementState(); case 4: - if (!writer.writeByteArray("valBytes", valBytes)) - return false; +// if (!writer.writeByteArray("valBytes", valBytes)) +// return false; writer.incrementState(); @@ -374,8 +375,6 @@ public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry impl /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen", - keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", - valBytes != null ? valBytes.length : "n/a"); + return S.toString(GridCacheRawVersionedEntry.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index ce2c579..6ed9df4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -24,6 +24,8 @@ import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.jetbrains.annotations.*; +import java.nio.*; + /** * Cache objects processor. */ @@ -92,7 +94,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * @return Value bytes. * @throws IgniteCheckedException If failed. */ - public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException; + public ByteBuffer marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException; /** * @param ctx Context. @@ -101,7 +103,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * @return Unmarshalled object. * @throws IgniteCheckedException If failed. */ - public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) throws IgniteCheckedException; + public Object unmarshal(CacheObjectContext ctx, ByteBuffer bytes, ClassLoader clsLdr) throws IgniteCheckedException; /** * @param node Node. @@ -134,7 +136,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * @param bytes Object bytes. * @return Cache object. */ - public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes); + public CacheObject toCacheObject(CacheObjectContext ctx, byte type, ByteBuffer bytes); /** * @param ctx Context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 1966a21..a0b3d52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -31,6 +31,7 @@ import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.math.*; +import java.nio.*; import java.util.*; import static org.apache.ignite.cache.CacheMemoryMode.*; @@ -83,12 +84,12 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException { + @Override public ByteBuffer marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException { return CU.marshal(ctx.kernalContext().cache().context(), val); } /** {@inheritDoc} */ - @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) + @Override public Object unmarshal(CacheObjectContext ctx, ByteBuffer bytes, ClassLoader clsLdr) throws IgniteCheckedException { return ctx.kernalContext().cache().context().marshaller().unmarshal(bytes, clsLdr); @@ -163,17 +164,18 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme ClassLoader ldr = valClsLdrId != null ? ctx.deploy().getClassLoader(valClsLdrId) : ctx.deploy().localLoader(); - return toCacheObject(ctx.cacheObjectContext(), unmarshal(ctx.cacheObjectContext(), bytes, ldr), false); + return toCacheObject(ctx.cacheObjectContext(), unmarshal(ctx.cacheObjectContext(), + ByteBuffer.wrap(bytes), ldr), false); } else - return toCacheObject(ctx.cacheObjectContext(), type, bytes); + return toCacheObject(ctx.cacheObjectContext(), type, ByteBuffer.wrap(bytes)); } /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { + @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, ByteBuffer bytes) { switch (type) { case CacheObject.TYPE_BYTE_ARR: - return new CacheObjectByteArrayImpl(bytes); + return new CacheObjectByteArrayImpl(U.toArray(bytes)); case CacheObject.TYPE_REGULAR: return new CacheObjectImpl(null, bytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 1fe0223..00b4359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -36,6 +36,7 @@ import org.apache.ignite.resources.*; import org.jetbrains.annotations.*; import java.io.*; +import java.nio.*; import java.util.*; import java.util.concurrent.*; @@ -1015,7 +1016,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private boolean hadLocNode; /** */ - private byte[] closureBytes; + private ByteBuffer closureBytes; /** */ private IgniteClosure<?, ?> closure; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java index fe50fd8..f788ad7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java @@ -53,7 +53,7 @@ public class GridContinuousMessage implements Message { private Collection<Message> msgs; /** Serialized message data. */ - private byte[] dataBytes; + private ByteBuffer dataBytes; /** Future ID for synchronous event notifications. */ private IgniteUuid futId; @@ -129,14 +129,14 @@ public class GridContinuousMessage implements Message { /** * @return Serialized message data. */ - public byte[] dataBytes() { + public ByteBuffer dataBytes() { return dataBytes; } /** * @param dataBytes Serialized message data. */ - public void dataBytes(byte[] dataBytes) { + public void dataBytes(ByteBuffer dataBytes) { this.dataBytes = dataBytes; } @@ -160,8 +160,8 @@ public class GridContinuousMessage implements Message { switch (writer.state()) { case 0: - if (!writer.writeByteArray("dataBytes", dataBytes)) - return false; +// if (!writer.writeByteArray("dataBytes", dataBytes)) +// return false; writer.incrementState(); @@ -203,7 +203,7 @@ public class GridContinuousMessage implements Message { switch (reader.state()) { case 0: - dataBytes = reader.readByteArray("dataBytes"); +// dataBytes = reader.readByteArray("dataBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1a177ad..c6e975b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -41,6 +41,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; +import java.nio.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; @@ -1365,7 +1366,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private IgnitePredicate<ClusterNode> prjPred; /** Serialized projection predicate. */ - private byte[] prjPredBytes; + private ByteBuffer prjPredBytes; /** Deployment class name. */ private String clsName; @@ -1443,7 +1444,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { out.writeBoolean(b); if (b) { - U.writeByteArray(out, prjPredBytes); + U.writeByteBuffer(out, prjPredBytes); U.writeString(out, clsName); out.writeObject(depInfo); } @@ -1462,7 +1463,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { boolean b = in.readBoolean(); if (b) { - prjPredBytes = U.readByteArray(in); + prjPredBytes = U.readByteBuffer(in); clsName = U.readString(in); depInfo = (GridDeploymentInfo)in.readObject(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d838a9db/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java index 2368fb2..e7d0f65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java @@ -39,13 +39,13 @@ public class GridDataLoadRequest implements Message { private long reqId; /** */ - private byte[] resTopicBytes; + private ByteBuffer resTopicBytes; /** Cache name. */ private String cacheName; /** */ - private byte[] updaterBytes; + private ByteBuffer updaterBytes; /** Entries to update. */ @GridDirectCollection(IgniteDataLoaderEntry.class) @@ -100,9 +100,9 @@ public class GridDataLoadRequest implements Message { * @param forceLocDep Force local deployment. */ public GridDataLoadRequest(long reqId, - byte[] resTopicBytes, + ByteBuffer resTopicBytes, @Nullable String cacheName, - byte[] updaterBytes, + ByteBuffer updaterBytes, Collection<IgniteDataLoaderEntry> entries, boolean ignoreDepOwnership, boolean skipStore, @@ -137,7 +137,7 @@ public class GridDataLoadRequest implements Message { /** * @return Response topic. */ - public byte[] responseTopicBytes() { + public ByteBuffer responseTopicBytes() { return resTopicBytes; } @@ -151,7 +151,7 @@ public class GridDataLoadRequest implements Message { /** * @return Updater. */ - public byte[] updaterBytes() { + public ByteBuffer updaterBytes() { return updaterBytes; } @@ -284,8 +284,8 @@ public class GridDataLoadRequest implements Message { writer.incrementState(); case 8: - if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) - return false; +// if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) +// return false; writer.incrementState(); @@ -302,8 +302,8 @@ public class GridDataLoadRequest implements Message { writer.incrementState(); case 11: - if (!writer.writeByteArray("updaterBytes", updaterBytes)) - return false; +// if (!writer.writeByteArray("updaterBytes", updaterBytes)) +// return false; writer.incrementState(); @@ -395,7 +395,7 @@ public class GridDataLoadRequest implements Message { reader.incrementState(); case 8: - resTopicBytes = reader.readByteArray("resTopicBytes"); +// resTopicBytes = reader.readByteArray("resTopicBytes"); if (!reader.isLastRead()) return false; @@ -419,7 +419,7 @@ public class GridDataLoadRequest implements Message { reader.incrementState(); case 11: - updaterBytes = reader.readByteArray("updaterBytes"); +// updaterBytes = reader.readByteArray("updaterBytes"); if (!reader.isLastRead()) return false;