Repository: incubator-ignite Updated Branches: refs/heads/ignite-gg-9149 [created] d39b47711
9149 logging Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d39b4771 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d39b4771 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d39b4771 Branch: refs/heads/ignite-gg-9149 Commit: d39b47711d6a48e80d6434b483c321203638fc32 Parents: b056a73 Author: Anton Vinogradov <avinogra...@gridgain.com> Authored: Tue Aug 4 13:03:54 2015 +0300 Committer: Anton Vinogradov <avinogra...@gridgain.com> Committed: Tue Aug 4 13:03:54 2015 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 22 ++++++++++++++++++++ .../dr/IgniteDrDataStreamerCacheUpdater.java | 15 ++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 5fae676..5659d6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -472,6 +472,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null))); } + public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>(); + /** * @param entries Entries. * @return Future. @@ -1291,6 +1293,26 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed topVer); try { + + for (DataStreamerEntry entry : entries) { + try { + + byte[] bytes = ((GridCacheRawVersionedEntry)entry).valueBytes(); + if (bytes != null) { + CacheObject val = ctx.config().getMarshaller().unmarshal(bytes, null); + val.finishUnmarshal(cacheObjCtx, null); + + Integer key0 = (Integer)(entry.getKey().value(null, false)); + map.putIfAbsent(key0, new ConcurrentLinkedQueue()); + map.get(key0).add(val.value(null, false)); + + } + // entry.setValue(null); + } + catch (Exception e) { + } + } + ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java index e5bbe39..b0169c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.dr; import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -29,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.stream.*; import java.util.*; +import java.util.concurrent.*; /** * Data center replication cache receiver for data streamer. @@ -38,6 +41,8 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache /** */ private static final long serialVersionUID = 0L; + public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>(); + /** {@inheritDoc} */ @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache0, Collection<Map.Entry<KeyCacheObject, CacheObject>> col) { @@ -79,8 +84,16 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache if (val == null) cache.removeAllConflict(Collections.singletonMap(key, entry.version())); - else + else { + Affinity affinity = cache0.unwrap(Ignite.class).affinity(cache0.getName()); + ClusterNode localNode = cache0.unwrap(Ignite.class).cluster().localNode(); + + Integer key0 = (key.value(null, false)); + + map.putIfAbsent(key0, new ConcurrentLinkedQueue()); + map.get(key0).add(localNode.id() + " - " + val.value().value(null, false) + "-prim " + affinity.isPrimary(localNode, key) + " ver " + entry.version()); cache.putAllConflict(Collections.singletonMap(key, val)); + } } if (log.isDebugEnabled())