Repository: incubator-ignite Updated Branches: refs/heads/ignite-394 e5f686239 -> 9c8217c17
# gg-9869 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/96f426bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/96f426bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/96f426bb Branch: refs/heads/ignite-394 Commit: 96f426bb0b5b3d19badac0610726f58ca4a0c15e Parents: e5f6862 Author: Artem Shutak <ashu...@gridgain.com> Authored: Wed Mar 4 15:24:33 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Wed Mar 4 15:24:33 2015 +0300 ---------------------------------------------------------------------- .../ignite/examples/CacheExamplesSelfTest.java | 2 +- .../org/apache/ignite/IgniteDataStreamer.java | 2 +- .../GridDistributedCacheAdapter.java | 2 +- .../dataload/GridDataLoadCacheUpdaters.java | 199 ------------------- .../dataload/GridDataLoadUpdateJob.java | 119 ----------- .../IgniteDataStreamerCacheUpdaters.java | 199 +++++++++++++++++++ .../dataload/IgniteDataStreamerImpl.java | 10 +- .../dataload/IgniteDataStreamerProcessor.java | 2 +- .../dataload/IgniteDataStreamerUpdateJob.java | 119 +++++++++++ .../processors/igfs/IgfsDataManager.java | 2 +- .../IgniteDataStreamerImplSelfTest.java | 4 +- .../IgniteDataStreamerPerformanceTest.java | 2 +- .../IgniteDataStreamerProcessorSelfTest.java | 22 +- 13 files changed, 342 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index c5c4599..14af44f 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -114,7 +114,7 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ - public void testCacheDataLoaderExample() throws Exception { + public void testCacheDataStreamerExample() throws Exception { CacheDataStreamerExample.main(EMPTY_ARGS); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 22aa0c1..a47c079 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -33,7 +33,7 @@ import java.util.*; * data may get to remote nodes in different order from which it was added to * the loader. * <p> - * Also note that {@code GridDataLoader} is not the only way to load data into cache. + * Also note that {@code IgniteDataStreamer} is not the only way to load data into cache. * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)} * method to load data from underlying data store. You can also use standard * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 5791b8d..16419f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -279,7 +279,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try (IgniteDataStreamer<K, V> dataLdr = ignite.dataStreamer(cacheName)) { ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0); - dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); + dataLdr.updater(IgniteDataStreamerCacheUpdaters.<K, V>batched()); for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { if (!locPart.isEmpty() && locPart.primary(topVer)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java deleted file mode 100644 index 78a7e62..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Bundled factory for cache updaters. - */ -public class GridDataLoadCacheUpdaters { - /** */ - private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); - - /** */ - private static final IgniteDataStreamer.Updater BATCHED = new Batched(); - - /** */ - private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); - - /** - * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance - * is not the best. - * - * @return Single updater. - */ - public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { - return INDIVIDUAL; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting - * updated concurrently. Performance is generally better than in {@link #individual()}. - * - * @return Batched updater. - */ - public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { - return BATCHED; - } - - /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates - * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. - * - * @return Batched sorted updater. - */ - public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { - return BATCHED_SORTED; - } - - /** - * Updates cache. - * - * @param cache Cache. - * @param rmvCol Keys to remove. - * @param putMap Entries to put. - * @throws IgniteException If failed. - */ - protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol, - Map<K, V> putMap) { - assert rmvCol != null || putMap != null; - - // Here we assume that there are no key duplicates, so the following calls are valid. - if (rmvCol != null) - ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); - - if (putMap != null) - cache.putAll(putMap); - } - - /** - * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. - */ - private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) - cache.remove(key); - else - cache.put(key, val); - } - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Set<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new HashSet<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new HashMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } - - /** - * Batched updater. Updates cache using batch operations thus is dead lock prone. - */ - private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - Map<K, V> putAll = null; - Set<K> rmvAll = null; - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key instanceof Comparable; - - V val = entry.getValue(); - - if (val == null) { - if (rmvAll == null) - rmvAll = new TreeSet<>(); - - rmvAll.add(key); - } - else { - if (putAll == null) - putAll = new TreeMap<>(); - - putAll.put(key, val); - } - } - - updateAll(cache, rmvAll, putAll); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java deleted file mode 100644 index 8aa554a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Job to put entries to cache on affinity node. - */ -class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { - /** */ - private final GridKernalContext ctx; - - /** */ - private final IgniteLogger log; - - /** Cache name. */ - private final String cacheName; - - /** Entries to put. */ - private final Collection<Map.Entry<K, V>> col; - - /** {@code True} to ignore deployment ownership. */ - private final boolean ignoreDepOwnership; - - /** */ - private final boolean skipStore; - - /** */ - private final IgniteDataStreamer.Updater<K, V> updater; - - /** - * @param ctx Context. - * @param log Log. - * @param cacheName Cache name. - * @param col Entries to put. - * @param ignoreDepOwnership {@code True} to ignore deployment ownership. - * @param updater Updater. - */ - GridDataLoadUpdateJob( - GridKernalContext ctx, - IgniteLogger log, - @Nullable String cacheName, - Collection<Map.Entry<K, V>> col, - boolean ignoreDepOwnership, - boolean skipStore, - IgniteDataStreamer.Updater<K, V> updater) { - this.ctx = ctx; - this.log = log; - - assert col != null && !col.isEmpty(); - assert updater != null; - - this.cacheName = cacheName; - this.col = col; - this.ignoreDepOwnership = ignoreDepOwnership; - this.skipStore = skipStore; - this.updater = updater; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (log.isDebugEnabled()) - log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); - -// TODO IGNITE-77: restore adapter usage. -// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); -// -// IgniteFuture<?> f = cache.context().preloader().startFuture(); -// -// if (!f.isDone()) -// f.get(); -// -// if (ignoreDepOwnership) -// cache.context().deploy().ignoreOwnership(true); - - IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); - - if (skipStore) - cache = (IgniteCacheProxy<K, V>)cache.withSkipStore(); - - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(true); - - try { - updater.update(cache, col); - - return null; - } - finally { - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(false); - - if (log.isDebugEnabled()) - log.debug("Update job finished on node: " + ctx.localNodeId()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java new file mode 100644 index 0000000..1742041 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Bundled factory for cache updaters. + */ +public class IgniteDataStreamerCacheUpdaters { + /** */ + private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); + + /** */ + private static final IgniteDataStreamer.Updater BATCHED = new Batched(); + + /** */ + private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); + + /** + * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance + * is not the best. + * + * @return Single updater. + */ + public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { + return INDIVIDUAL; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting + * updated concurrently. Performance is generally better than in {@link #individual()}. + * + * @return Batched updater. + */ + public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { + return BATCHED; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates + * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. + * + * @return Batched sorted updater. + */ + public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { + return BATCHED_SORTED; + } + + /** + * Updates cache. + * + * @param cache Cache. + * @param rmvCol Keys to remove. + * @param putMap Entries to put. + * @throws IgniteException If failed. + */ + protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol, + Map<K, V> putMap) { + assert rmvCol != null || putMap != null; + + // Here we assume that there are no key duplicates, so the following calls are valid. + if (rmvCol != null) + ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); + + if (putMap != null) + cache.putAll(putMap); + } + + /** + * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. + */ + private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) + cache.remove(key); + else + cache.put(key, val); + } + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Set<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new HashSet<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new HashMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Set<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key instanceof Comparable; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new TreeSet<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new TreeMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java index 80d08c8..1231e27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java @@ -302,7 +302,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D if (node == null) throw new IgniteException("Failed to get node for cache: " + cacheName); - updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; + updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER; } /** {@inheritDoc} */ @@ -450,7 +450,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D assert key != null; if (initPda) { - jobPda = new DataLoaderPda(key, entry.getValue(), updater); + jobPda = new DataStreamerPda(key, entry.getValue(), updater); initPda = false; } @@ -981,7 +981,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D if (isLocNode) { fut = ctx.closure().callLocalSafe( - new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); + new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); locFuts.add(fut); @@ -1193,7 +1193,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D /** * Data streamer peer-deploy aware. */ - private class DataLoaderPda implements GridPeerDeployAware { + private class DataStreamerPda implements GridPeerDeployAware { /** */ private static final long serialVersionUID = 0L; @@ -1211,7 +1211,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D * * @param objs Collection of objects to detect deploy class and class loader. */ - private DataLoaderPda(Object... objs) { + private DataStreamerPda(Object... objs) { this.objs = Arrays.asList(objs); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java index 69ea440..7db41e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java @@ -248,7 +248,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { return; } - GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx, + IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx, log, req.cacheName(), col, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java new file mode 100644 index 0000000..1a3db40 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Job to put entries to cache on affinity node. + */ +class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> { + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + /** Cache name. */ + private final String cacheName; + + /** Entries to put. */ + private final Collection<Map.Entry<K, V>> col; + + /** {@code True} to ignore deployment ownership. */ + private final boolean ignoreDepOwnership; + + /** */ + private final boolean skipStore; + + /** */ + private final IgniteDataStreamer.Updater<K, V> updater; + + /** + * @param ctx Context. + * @param log Log. + * @param cacheName Cache name. + * @param col Entries to put. + * @param ignoreDepOwnership {@code True} to ignore deployment ownership. + * @param updater Updater. + */ + IgniteDataStreamerUpdateJob( + GridKernalContext ctx, + IgniteLogger log, + @Nullable String cacheName, + Collection<Map.Entry<K, V>> col, + boolean ignoreDepOwnership, + boolean skipStore, + IgniteDataStreamer.Updater<K, V> updater) { + this.ctx = ctx; + this.log = log; + + assert col != null && !col.isEmpty(); + assert updater != null; + + this.cacheName = cacheName; + this.col = col; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.updater = updater; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + if (log.isDebugEnabled()) + log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); + +// TODO IGNITE-77: restore adapter usage. +// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); +// +// IgniteFuture<?> f = cache.context().preloader().startFuture(); +// +// if (!f.isDone()) +// f.get(); +// +// if (ignoreDepOwnership) +// cache.context().deploy().ignoreOwnership(true); + + IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); + + if (skipStore) + cache = (IgniteCacheProxy<K, V>)cache.withSkipStore(); + + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(true); + + try { + updater.update(cache, col); + + return null; + } + finally { + if (ignoreDepOwnership) + cache.context().deploy().ignoreOwnership(false); + + if (log.isDebugEnabled()) + log.debug("Update job finished on node: " + ctx.localNodeId()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index d585352..15309bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -314,7 +314,7 @@ public class IgfsDataManager extends IgfsManager { if (cfg.getPerNodeParallelBatchCount() > 0) ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount()); - ldr.updater(GridDataLoadCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); + ldr.updater(IgniteDataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); return ldr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java index b0d8625..306e615 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java @@ -36,7 +36,7 @@ import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** - * Tests for {@code GridDataLoaderImpl}. + * Tests for {@code IgniteDataStreamerImpl}. */ public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest { /** IP finder. */ @@ -69,7 +69,7 @@ public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testNullPointerExceptionUponDataLoaderClosing() throws Exception { + public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { try { startGrids(5); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java index b3dd71b..22a1f97 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java @@ -140,7 +140,7 @@ public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); ldr.perNodeBufferSize(8192); - ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted()); + ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted()); ldr.autoFlushFrequency(0); final LongAdder cnt = new LongAdder(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java index 8eefebf..23a46e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java @@ -124,7 +124,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest public void testPartitioned() throws Exception { mode = PARTITIONED; - checkDataLoader(); + checkDataStreamer(); } /** @@ -134,7 +134,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest mode = PARTITIONED; nearEnabled = false; - checkDataLoader(); + checkDataStreamer(); } /** @@ -143,7 +143,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest public void testReplicated() throws Exception { mode = REPLICATED; - checkDataLoader(); + checkDataStreamer(); } /** @@ -153,7 +153,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest mode = LOCAL; try { - checkDataLoader(); + checkDataStreamer(); assert false; } @@ -167,7 +167,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest * @throws Exception If failed. */ @SuppressWarnings("ErrorNotRethrown") - private void checkDataLoader() throws Exception { + private void checkDataStreamer() throws Exception { try { Ignite g1 = startGrid(1); @@ -178,7 +178,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); + ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); final AtomicInteger idxGen = new AtomicInteger(); final int cnt = 400; @@ -220,7 +220,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null); - rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); + rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted()); final CountDownLatch l2 = new CountDownLatch(threads); @@ -265,7 +265,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest public void testPartitionedIsolated() throws Exception { mode = PARTITIONED; - checkIsolatedDataLoader(); + checkIsolatedDataStreamer(); } /** @@ -274,13 +274,13 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest public void testReplicatedIsolated() throws Exception { mode = REPLICATED; - checkIsolatedDataLoader(); + checkIsolatedDataStreamer(); } /** * @throws Exception If failed. */ - private void checkIsolatedDataLoader() throws Exception { + private void checkIsolatedDataStreamer() throws Exception { try { useCache = true; @@ -418,7 +418,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest // Get and configure loader. final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null); - ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>individual()); + ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual()); ldr.perNodeBufferSize(2); // Define count of puts.