Repository: incubator-ignite Updated Branches: refs/heads/ignite-51 b93f7a3ce -> 814fae3e4
# ignite-51 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/814fae3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/814fae3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/814fae3e Branch: refs/heads/ignite-51 Commit: 814fae3e4f171fcac55bb7054d56469772196e8e Parents: b93f7a3 Author: sboikov <sboi...@gridgain.com> Authored: Thu Mar 5 06:27:29 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Mar 5 06:39:12 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 - .../processors/cache/GridCacheContext.java | 12 -- .../processors/cache/GridCacheMapEntry.java | 31 +-- .../processors/cache/GridCacheSwapManager.java | 21 -- .../processors/cache/GridCacheValueBytes.java | 194 ------------------- .../GridDistributedCacheAdapter.java | 9 +- .../preloader/GridDhtPartitionSupplyPool.java | 9 - .../dataload/IgniteDataLoaderImpl.java | 9 + .../util/offheap/unsafe/GridUnsafeMemory.java | 20 -- 9 files changed, 17 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 05679ee..5b606c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -489,11 +489,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 88: - msg = new GridCacheValueBytes(); - - break; - case 89: msg = new CacheObjectImpl(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 0827ef7..0a386eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1698,18 +1698,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @param bytes Object marshalled with portable marshaller. - * @return Object marshalled with grid marshaller. - * @throws IgniteCheckedException If failed. - */ - // TODO IGNITE-51. - public byte[] convertPortableBytes(byte[] bytes) throws IgniteCheckedException { - assert portableEnabled() && offheapTiered(); - - return marshaller().marshal(portable().unmarshal(bytes, 0)); - } - - /** * @param obj Object. * @return Portable object. * @throws IgniteException In case of error. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index dc48cf7..0caf1f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -247,19 +247,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @param valBytes Value bytes. * @return Length of value. */ - private int valueLength(@Nullable byte[] val, GridCacheValueBytes valBytes) { - assert valBytes != null; - - return val != null ? val.length : valBytes.isNull() ? 0 : valBytes.get().length - (valBytes.isPlain() ? 0 : 6); - } - - /** - * Isolated method to get length of IGFS block. - * - * @param val Value. - * @param valBytes Value bytes. - * @return Length of value. - */ private int valueLength0(@Nullable CacheObject val, @Nullable IgniteBiTuple<byte[], Boolean> valBytes) { byte[] bytes = val != null ? (byte[])val.value(cctx.cacheObjectContext(), false) : null; @@ -473,13 +460,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { e.value(val); } } - else { // Read from swap. + else // Read from swap. valPtr = 0; - - // TODO IGNITE-51. - if (cctx.portableEnabled() && !e.valueIsByteArray()) - e.valueBytes(null); // Clear bytes marshalled with portable marshaller. - } } } else @@ -830,7 +812,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - // TODO IGNITE-51. Object storeVal = readThrough(tx0, key, false, subjId, taskName); if (storeVal != null) @@ -3137,7 +3118,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return null; } - // TODO IGNITE-51. return cctx.toCacheObject(cctx.store().loadFromStore(cctx.tm().localTxx(), key)); } @@ -3293,12 +3273,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (isNew()) { CacheObject val = unswapped.value(); - if (cctx.portableEnabled()) { - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); - - if (cctx.offheapTiered() && !unswapped.valueIsByteArray()) - unswapped.valueBytes(cctx.convertPortableBytes(unswapped.valueBytes())); - } + val = cctx.kernalContext().portable().prepareForCache(val, cctx); // Version does not change for load ops. update(val, @@ -3341,7 +3316,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); if (val != null) { updateIndex(val, expTime, newVer, old); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 36d33ee..9d4b942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -419,27 +419,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { e.value(val); } -// TODO IGNITE-51 remove after tested with portables. -// if (e.valueIsByteArray()) -// e.value((V)e.valueBytes()); -// else if (unmarshal) { -// V val; -// -// if (cctx.portableEnabled() && cctx.offheapTiered()) -// val = (V)cctx.portable().unmarshal(e.valueBytes(), 0); -// else { -// ClassLoader ldr = e.valueClassLoaderId() != null ? cctx.deploy().getClassLoader(e.valueClassLoaderId()) : -// cctx.deploy().localLoader(); -// -// if (ldr == null) -// return null; -// -// val = cctx.marshaller().unmarshal(e.valueBytes(), ldr); -// } -// -// e.value(val); -// } - return e; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueBytes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueBytes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueBytes.java deleted file mode 100644 index 7812c5b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueBytes.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.nio.*; - -/** - * Wrapped value bytes of cache entry. - */ -public class GridCacheValueBytes implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Null instance. */ - private static final GridCacheValueBytes NULL = new GridCacheValueBytes(); - - /** - * @param bytes Bytes. - * @return Plain value bytes. - */ - public static GridCacheValueBytes plain(Object bytes) { - assert bytes != null && bytes instanceof byte[]; - - return new GridCacheValueBytes((byte[])bytes, true); - } - - /** - * @param bytes Bytes. - * @return Marshaled value bytes. - */ - public static GridCacheValueBytes marshaled(byte[] bytes) { - assert bytes != null; - - return new GridCacheValueBytes(bytes, false); - } - - /** - * @return Nil value bytes. - */ - public static GridCacheValueBytes nil() { - return NULL; - } - - /** Bytes. */ - private byte[] bytes; - - /** Flag indicating if provided byte array is actual value, not marshaled data. */ - private boolean plain; - - /** - * Private constructor for NULL instance. - */ - public GridCacheValueBytes() { - // No-op. - } - - /** - * Constructor. - * - * @param bytes Bytes. - * @param plain Flag indicating if provided byte array is actual value, not marshaled data. - */ - public GridCacheValueBytes(byte[] bytes, boolean plain) { - this.bytes = bytes; - this.plain = plain; - } - - /** - * @return Bytes. - */ - @Nullable public byte[] get() { - return bytes; - } - - /** - * @return Bytes if this is plain bytes or {@code null} otherwise. - */ - @Nullable public byte[] getIfPlain() { - return plain && bytes != null ? bytes : null; - } - - /** - * @return Bytes if this is marshaled bytes or {@code null} otherwise. - */ - @Nullable public byte[] getIfMarshaled() { - return !plain && bytes != null ? bytes : null; - } - - /** - * @return Flag indicating if provided byte array is actual value, not marshaled data. - */ - public boolean isPlain() { - return plain; - } - - /** - * @return {@code True} if byte array is {@code null}. - */ - public boolean isNull() { - return bytes == null; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("bytes", bytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean("plain", plain)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - bytes = reader.readByteArray("bytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - plain = reader.readBoolean("plain"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 88; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheValueBytes.class, this, "len", bytes != null ? bytes.length : -1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index c9f72f1..0adabd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -284,7 +284,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter else dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; - try (IgniteDataLoader<KeyCacheObject, Object> dataLdr = ignite.dataLoader(cacheName)) { + try (IgniteDataLoaderImpl<KeyCacheObject, Object> dataLdr = + (IgniteDataLoaderImpl)ignite.dataLoader(cacheName)) { ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0); dataLdr.updater(GridDataLoadCacheUpdaters.<KeyCacheObject, Object>batched()); @@ -293,7 +294,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (!locPart.isEmpty() && locPart.primary(topVer)) { for (GridDhtCacheEntry o : locPart.entries()) { if (!o.obsoleteOrDeleted()) - dataLdr.removeData(o.key()); + dataLdr.removeDataInternal(o.key()); } } } @@ -301,12 +302,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer); while (it.hasNext()) - dataLdr.removeData(it.next()); + dataLdr.removeDataInternal(it.next()); it = dht.context().swap().swapKeyIterator(true, false, topVer); while (it.hasNext()) - dataLdr.removeData(it.next()); + dataLdr.removeDataInternal(it.next()); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 2131de9..9ad221e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -370,15 +370,6 @@ class GridDhtPartitionSupplyPool<K, V> { info.ttl(swapEntry.ttl()); info.expireTime(swapEntry.expireTime()); info.version(swapEntry.version()); -// TODO IGNITE-51. -// if (!swapEntry.valueIsByteArray()) { -// if (convertPortable) -// info.valueBytes(cctx.convertPortableBytes(swapEntry.valueBytes())); -// else -// info.valueBytes(swapEntry.valueBytes()); -// } -// else -// info.value(swapEntry.value()); info.value(swapEntry.value()); if (preloadPred == null || preloadPred.apply(info)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index b6ec07e..30fd8bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -414,12 +414,21 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** * @param key Key. * @param val Value. + * @return Future. */ public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) { return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key, val))); } /** + * @param key Key. + * @return Future. + */ + public IgniteFuture<?> removeDataInternal(KeyCacheObject key) { + return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key, null))); + } + + /** * @param entries Entries. * @return Future. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/814fae3e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMemory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMemory.java index e08c2e2..57fe0bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMemory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMemory.java @@ -457,26 +457,6 @@ public class GridUnsafeMemory { * @param ptr Pointer to read. * @return Stored byte array and "raw bytes" flag. */ - public GridCacheValueBytes getOffHeap(long ptr) { - if (ptr != 0) { - int size = readInt(ptr); - - boolean plain = readByte(ptr + 4) == 1; - byte[] bytes = readBytes(ptr + 5, size); - - return plain ? GridCacheValueBytes.plain(bytes) : GridCacheValueBytes.marshaled(bytes); - } - - return GridCacheValueBytes.nil(); - } - - /** - * Get value stored in offheap along with a flag indicating whether this is "raw bytes", i.e. this is actual value - * or not. - * - * @param ptr Pointer to read. - * @return Stored byte array and "raw bytes" flag. - */ public IgniteBiTuple<byte[], Boolean> get(long ptr) { assert ptr != 0;