# ignite-394
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5f68623 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5f68623 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5f68623 Branch: refs/heads/ignite-406 Commit: e5f686239d6e98b6b9cbbf754c21b6b3b44e4bf8 Parents: de3dcf0 Author: Artem Shutak <ashu...@gridgain.com> Authored: Tue Mar 3 22:15:42 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Tue Mar 3 22:15:42 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 4 +- .../dr/GridDrDataLoadCacheUpdater.java | 85 -------------------- .../dr/IgniteDrDataStreamerCacheUpdater.java | 85 ++++++++++++++++++++ 3 files changed, 87 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5f68623/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 61fa262..83118c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3880,7 +3880,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataStream().dataStreamer(ctx.namex(), false); try { - ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); + ldr.updater(new IgniteDrDataStreamerCacheUpdater<K, V>()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); @@ -4089,7 +4089,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataStream().dataStreamer(ctx.namex(), false); try { - ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); + ldr.updater(new IgniteDrDataStreamerCacheUpdater<K, V>()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5f68623/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java deleted file mode 100644 index 95f7ccb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dr; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.dr.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Data center replication cache updater for data streamer. - */ -public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataStreamer.Updater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) { - try { - String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); - - GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context(); - IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); - GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); - - assert !F.isEmpty(col); - - if (log.isDebugEnabled()) - log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); - - IgniteInternalFuture<?> f = cache.context().preloader().startFuture(); - - if (!f.isDone()) - f.get(); - - for (Map.Entry<K, V> entry0 : col) { - GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K, V>)entry0; - - entry.unmarshal(ctx.config().getMarshaller()); - - K key = entry.key(); - - // Ensure that updater to not receive special-purpose values for TTL and expire time. - assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; - assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; - - GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? - new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : - new GridCacheDrInfo<>(entry.value(), entry.version()) : null; - - if (val == null) - cache.removeAllConflict(Collections.singletonMap(key, entry.version())); - else - cache.putAllConflict(Collections.singletonMap(key, val)); - } - - if (log.isDebugEnabled()) - log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5f68623/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java new file mode 100644 index 0000000..9de3349 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dr; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Data center replication cache updater for data streamer. + */ +public class IgniteDrDataStreamerCacheUpdater<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) { + try { + String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); + + GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context(); + IgniteLogger log = ctx.log(IgniteDrDataStreamerCacheUpdater.class); + GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); + + assert !F.isEmpty(col); + + if (log.isDebugEnabled()) + log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); + + IgniteInternalFuture<?> f = cache.context().preloader().startFuture(); + + if (!f.isDone()) + f.get(); + + for (Map.Entry<K, V> entry0 : col) { + GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K, V>)entry0; + + entry.unmarshal(ctx.config().getMarshaller()); + + K key = entry.key(); + + // Ensure that updater to not receive special-purpose values for TTL and expire time. + assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; + assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; + + GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? + new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : + new GridCacheDrInfo<>(entry.value(), entry.version()) : null; + + if (val == null) + cache.removeAllConflict(Collections.singletonMap(key, entry.version())); + else + cache.putAllConflict(Collections.singletonMap(key, val)); + } + + if (log.isDebugEnabled()) + log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } +}