# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b33b651 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b33b651 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b33b651 Branch: refs/heads/ignite-394 Commit: 9b33b6510f5b82c30c8e75a66eb328b00bc425e4 Parents: 6909cc4 Author: Artem Shutak <ashu...@gridgain.com> Authored: Tue Mar 3 21:40:19 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Tue Mar 3 21:40:19 2015 +0300 ---------------------------------------------------------------------- .../datagrid/CacheDataLoaderExample.java | 6 +- .../datagrid/CachePopularNumbersExample.java | 4 +- .../src/main/java/org/apache/ignite/Ignite.java | 6 +- .../org/apache/ignite/IgniteDataLoader.java | 379 ----- .../org/apache/ignite/IgniteDataStreamer.java | 379 +++++ .../apache/ignite/internal/IgniteKernal.java | 2 +- .../processors/cache/GridCacheAdapter.java | 10 +- .../GridDistributedCacheAdapter.java | 4 +- .../dataload/GridDataLoadCacheUpdaters.java | 18 +- .../dataload/GridDataLoadUpdateJob.java | 4 +- .../dataload/GridDataLoaderFuture.java | 4 +- .../dataload/GridDataLoaderProcessor.java | 16 +- .../dataload/IgniteDataLoaderImpl.java | 1453 ------------------ .../dataload/IgniteDataStreamerImpl.java | 1453 ++++++++++++++++++ .../dr/GridDrDataLoadCacheUpdater.java | 2 +- .../processors/igfs/IgfsDataManager.java | 10 +- ...iteTxConsistencyRestartAbstractSelfTest.java | 2 +- ...idCachePartitionedHitsAndMissesSelfTest.java | 4 +- .../GridCacheLruNearEvictionPolicySelfTest.java | 2 +- ...heNearOnlyLruNearEvictionPolicySelfTest.java | 2 +- .../dataload/GridDataLoaderImplSelfTest.java | 5 +- .../dataload/GridDataLoaderPerformanceTest.java | 2 +- .../GridDataLoaderProcessorSelfTest.java | 27 +- .../loadtests/colocation/GridTestMain.java | 2 +- .../loadtests/discovery/GridGcTimeoutTest.java | 2 +- .../mapper/GridContinuousMapperLoadTest1.java | 2 +- .../mapper/GridContinuousMapperLoadTest2.java | 2 +- .../ignite/testframework/junits/IgniteMock.java | 4 +- .../scala/org/apache/ignite/scalar/scalar.scala | 8 +- .../org/apache/ignite/IgniteSpringBean.java | 2 +- .../cache/IgniteSqlQueryBenchmark.java | 2 +- .../cache/IgniteSqlQueryJoinBenchmark.java | 2 +- 32 files changed, 1909 insertions(+), 1911 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java index 57b0cd2..4cdbfd4 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java @@ -21,8 +21,8 @@ import org.apache.ignite.*; import org.apache.ignite.examples.*; /** - * Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API. - * {@link IgniteDataLoader} is a lot more efficient to use than standard + * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API. + * {@link IgniteDataStreamer} is a lot more efficient to use than standard * {@code put(...)} operation as it properly buffers cache requests * together and properly manages load on remote nodes. * <p> @@ -63,7 +63,7 @@ public class CacheDataLoaderExample { long start = System.currentTimeMillis(); - try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) { + try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) { // Configure loader. ldr.perNodeBufferSize(1024); ldr.perNodeParallelLoadOperations(8); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java index 0f71681..1fc737b 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java @@ -92,7 +92,7 @@ public class CachePopularNumbersExample { * @throws IgniteException If failed. */ private static void streamData(final Ignite ignite) throws IgniteException { - try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) { + try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) { // Set larger per-node buffer size since our state is relatively small. ldr.perNodeBufferSize(2048); @@ -140,7 +140,7 @@ public class CachePopularNumbersExample { /** * Increments value for key. */ - private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> { + private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> { /** */ private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() { @Override public Void process(MutableEntry<Integer, Long> e, Object... args) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 8851d8f..44d4ba9 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -42,7 +42,7 @@ import java.util.concurrent.*; * In addition to {@link ClusterGroup} functionality, from here you can get the following: * <ul> * <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li> - * <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li> + * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li> * <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li> * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li> * <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li> @@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable { /** * Gets a new instance of data loader associated with given cache name. Data loader * is responsible for loading external data into in-memory data grid. For more information - * refer to {@link IgniteDataLoader} documentation. + * refer to {@link IgniteDataStreamer} documentation. * * @param cacheName Cache name ({@code null} for default cache). * @return Data loader. */ - public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName); + public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName); /** * Gets an instance of IGFS - Ignite In-Memory File System, if one is not http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java deleted file mode 100644 index 3cff287..0000000 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite; - -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Data loader is responsible for loading external data into cache. It achieves it by - * properly buffering updates and properly mapping keys to nodes responsible for the data - * to make sure that there is the least amount of data movement possible and optimal - * network and memory utilization. - * <p> - * Note that loader will load data concurrently by multiple internal threads, so the - * data may get to remote nodes in different order from which it was added to - * the loader. - * <p> - * Also note that {@code GridDataLoader} is not the only way to load data into cache. - * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)} - * method to load data from underlying data store. You can also use standard - * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most - * likely will not perform as well as this class for loading data. And finally, - * data can be loaded from underlying data store on demand, whenever it is accessed - - * for this no explicit data loading step is needed. - * <p> - * {@code IgniteDataLoader} supports the following configuration properties: - * <ul> - * <li> - * {@link #perNodeBufferSize(int)} - when entries are added to data loader via - * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right - * away and are buffered internally for better performance and network utilization. - * This setting controls the size of internal per-node buffer before buffered data - * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE} - * value. - * </li> - * <li> - * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added - * to the data loader via {@link #addData(Object, Object)} method faster than it can - * be put in cache. In this case, new buffered load messages are sent to remote nodes - * before responses from previous ones are received. This could cause unlimited heap - * memory utilization growth on local and remote nodes. To control memory utilization, - * this setting limits maximum allowed number of parallel buffered load messages that - * are being processed on remote nodes. If this number is exceeded, then - * {@link #addData(Object, Object)} method will block to control memory utilization. - * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value. - * </li> - * <li> - * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially, - * this is the time after which the loader will make an attempt to submit all data - * added so far to remote nodes. Note that there is no guarantee that data will be - * delivered after this concrete attempt (e.g., it can fail when topology is - * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}). - * </li> - * <li> - * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent - * updates and allow data loader choose most optimal concurrent implementation. - * </li> - * <li> - * {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries. - * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. - * </li> - * <li> - * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes - * loaded by a data loader must be class-loadable from the same class-loader. - * Ignite will make the best effort to detect the most suitable class-loader - * for data loading. However, in complex cases, where compound or deeply nested - * class-loaders are used, it is best to specify a deploy class which can be any - * class loaded by the class-loader for given data. - * </li> - * </ul> - */ -public interface IgniteDataLoader<K, V> extends AutoCloseable { - /** Default max concurrent put operations count. */ - public static final int DFLT_MAX_PARALLEL_OPS = 16; - - /** Default per node buffer size. */ - public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; - - /** - * Name of cache to load data to. - * - * @return Cache name or {@code null} for default cache. - */ - public String cacheName(); - - /** - * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache. - * Default is {@code true}. - * - * @return Flag value. - */ - public boolean allowOverwrite(); - - /** - * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache. - * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method. - * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store. - * - * @param allowOverwrite Flag value. - * @throws IgniteException If failed. - */ - public void allowOverwrite(boolean allowOverwrite) throws IgniteException; - - /** - * Gets flag indicating that write-through behavior should be disabled for data loading. - * Default is {@code false}. - * - * @return Skip store flag. - */ - public boolean skipStore(); - - /** - * Sets flag indicating that write-through behavior should be disabled for data loading. - * Default is {@code false}. - * - * @param skipStore Skip store flag. - */ - public void skipStore(boolean skipStore); - - /** - * Gets size of per node key-value pairs buffer. - * - * @return Per node buffer size. - */ - public int perNodeBufferSize(); - - /** - * Sets size of per node key-value pairs buffer. - * <p> - * This method should be called prior to {@link #addData(Object, Object)} call. - * <p> - * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}. - * - * @param bufSize Per node buffer size. - */ - public void perNodeBufferSize(int bufSize); - - /** - * Gets maximum number of parallel load operations for a single node. - * - * @return Maximum number of parallel load operations for a single node. - */ - public int perNodeParallelLoadOperations(); - - /** - * Sets maximum number of parallel load operations for a single node. - * <p> - * This method should be called prior to {@link #addData(Object, Object)} call. - * <p> - * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. - * - * @param parallelOps Maximum number of parallel load operations for a single node. - */ - public void perNodeParallelLoadOperations(int parallelOps); - - /** - * Gets automatic flush frequency. Essentially, this is the time after which the - * loader will make an attempt to submit all data added so far to remote nodes. - * Note that there is no guarantee that data will be delivered after this concrete - * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. - * <p> - * If set to {@code 0}, automatic flush is disabled. - * <p> - * Automatic flush is disabled by default (default value is {@code 0}). - * - * @return Flush frequency or {@code 0} if automatic flush is disabled. - * @see #flush() - */ - public long autoFlushFrequency(); - - /** - * Sets automatic flush frequency. Essentially, this is the time after which the - * loader will make an attempt to submit all data added so far to remote nodes. - * Note that there is no guarantee that data will be delivered after this concrete - * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. - * <p> - * If set to {@code 0}, automatic flush is disabled. - * <p> - * Automatic flush is disabled by default (default value is {@code 0}). - * - * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush. - * @see #flush() - */ - public void autoFlushFrequency(long autoFlushFreq); - - /** - * Gets future for this loading process. This future completes whenever method - * {@link #close(boolean)} completes. By attaching listeners to this future - * it is possible to get asynchronous notifications for completion of this - * loading process. - * - * @return Future for this loading process. - */ - public IgniteFuture<?> future(); - - /** - * Optional deploy class for peer deployment. All classes loaded by a data loader - * must be class-loadable from the same class-loader. Ignite will make the best - * effort to detect the most suitable class-loader for data loading. However, - * in complex cases, where compound or deeply nested class-loaders are used, - * it is best to specify a deploy class which can be any class loaded by - * the class-loader for given data. - * - * @param depCls Any class loaded by the class-loader for given data. - */ - public void deployClass(Class<?> depCls); - - /** - * Sets custom cache updater to this data loader. - * - * @param updater Cache updater. - */ - public void updater(Updater<K, V> updater); - - /** - * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}. - * - * @param key Key. - * @return Future fo this operation. - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - */ - public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException; - - /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. - * <p> - * Note that loader will load data concurrently by multiple internal threads, so the - * data may get to remote nodes in different order from which it was added to - * the loader. - * - * @param key Key. - * @param val Value or {@code null} if respective entry must be removed from cache. - * @return Future fo this operation. - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - */ - public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException, - IllegalStateException; - - /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. - * <p> - * Note that loader will load data concurrently by multiple internal threads, so the - * data may get to remote nodes in different order from which it was added to - * the loader. - * - * @param entry Entry. - * @return Future fo this operation. - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - */ - public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException, - IllegalStateException; - - /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. - * <p> - * Note that loader will load data concurrently by multiple internal threads, so the - * data may get to remote nodes in different order from which it was added to - * the loader. - * - * @param entries Collection of entries to be loaded. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @return Future for this load operation. - */ - public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; - - /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. - * <p> - * Note that loader will load data concurrently by multiple internal threads, so the - * data may get to remote nodes in different order from which it was added to - * the loader. - * - * @param entries Map to be loaded. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @return Future for this load operation. - */ - public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException; - - /** - * Loads any remaining data, but doesn't close the loader. Data can be still added after - * flush is finished. This method blocks and doesn't allow to add any data until all data - * is loaded. - * <p> - * If another thread is already performing flush, this method will block, wait for - * another thread to complete flush and exit. If you don't want to wait in this case, - * use {@link #tryFlush()} method. - * - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @see #tryFlush() - */ - public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException; - - /** - * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush}, - * with the difference that it won't wait and will exit immediately. - * - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @see #flush() - */ - public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException; - - /** - * Loads any remaining data and closes this loader. - * - * @param cancel {@code True} to cancel ongoing loading operations. - * @throws IgniteException If failed to map key to node. - * @throws IgniteInterruptedException If thread has been interrupted. - */ - public void close(boolean cancel) throws IgniteException, IgniteInterruptedException; - - /** - * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method. - * <p> - * The method is invoked automatically on objects managed by the - * {@code try-with-resources} statement. - * - * @throws IgniteException If failed to close data loader. - * @throws IgniteInterruptedException If thread has been interrupted. - */ - @Override public void close() throws IgniteException, IgniteInterruptedException; - - /** - * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)} - * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best - * performance custom user-defined implementation may help. - * <p> - * Data loader can be configured to use custom implementation of updater instead of default one using - * {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method. - */ - interface Updater<K, V> extends Serializable { - /** - * Updates cache with batch of entries. - * - * @param cache Cache. - * @param entries Collection of entries. - * @throws IgniteException If failed. - */ - public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java new file mode 100644 index 0000000..c48d61a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Data loader is responsible for loading external data into cache. It achieves it by + * properly buffering updates and properly mapping keys to nodes responsible for the data + * to make sure that there is the least amount of data movement possible and optimal + * network and memory utilization. + * <p> + * Note that loader will load data concurrently by multiple internal threads, so the + * data may get to remote nodes in different order from which it was added to + * the loader. + * <p> + * Also note that {@code GridDataLoader} is not the only way to load data into cache. + * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)} + * method to load data from underlying data store. You can also use standard + * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most + * likely will not perform as well as this class for loading data. And finally, + * data can be loaded from underlying data store on demand, whenever it is accessed - + * for this no explicit data loading step is needed. + * <p> + * {@code IgniteDataLoader} supports the following configuration properties: + * <ul> + * <li> + * {@link #perNodeBufferSize(int)} - when entries are added to data loader via + * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right + * away and are buffered internally for better performance and network utilization. + * This setting controls the size of internal per-node buffer before buffered data + * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE} + * value. + * </li> + * <li> + * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added + * to the data loader via {@link #addData(Object, Object)} method faster than it can + * be put in cache. In this case, new buffered load messages are sent to remote nodes + * before responses from previous ones are received. This could cause unlimited heap + * memory utilization growth on local and remote nodes. To control memory utilization, + * this setting limits maximum allowed number of parallel buffered load messages that + * are being processed on remote nodes. If this number is exceeded, then + * {@link #addData(Object, Object)} method will block to control memory utilization. + * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value. + * </li> + * <li> + * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially, + * this is the time after which the loader will make an attempt to submit all data + * added so far to remote nodes. Note that there is no guarantee that data will be + * delivered after this concrete attempt (e.g., it can fail when topology is + * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}). + * </li> + * <li> + * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent + * updates and allow data loader choose most optimal concurrent implementation. + * </li> + * <li> + * {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries. + * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. + * </li> + * <li> + * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes + * loaded by a data loader must be class-loadable from the same class-loader. + * Ignite will make the best effort to detect the most suitable class-loader + * for data loading. However, in complex cases, where compound or deeply nested + * class-loaders are used, it is best to specify a deploy class which can be any + * class loaded by the class-loader for given data. + * </li> + * </ul> + */ +public interface IgniteDataStreamer<K, V> extends AutoCloseable { + /** Default max concurrent put operations count. */ + public static final int DFLT_MAX_PARALLEL_OPS = 16; + + /** Default per node buffer size. */ + public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; + + /** + * Name of cache to load data to. + * + * @return Cache name or {@code null} for default cache. + */ + public String cacheName(); + + /** + * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache. + * Default is {@code true}. + * + * @return Flag value. + */ + public boolean allowOverwrite(); + + /** + * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache. + * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method. + * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store. + * + * @param allowOverwrite Flag value. + * @throws IgniteException If failed. + */ + public void allowOverwrite(boolean allowOverwrite) throws IgniteException; + + /** + * Gets flag indicating that write-through behavior should be disabled for data loading. + * Default is {@code false}. + * + * @return Skip store flag. + */ + public boolean skipStore(); + + /** + * Sets flag indicating that write-through behavior should be disabled for data loading. + * Default is {@code false}. + * + * @param skipStore Skip store flag. + */ + public void skipStore(boolean skipStore); + + /** + * Gets size of per node key-value pairs buffer. + * + * @return Per node buffer size. + */ + public int perNodeBufferSize(); + + /** + * Sets size of per node key-value pairs buffer. + * <p> + * This method should be called prior to {@link #addData(Object, Object)} call. + * <p> + * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}. + * + * @param bufSize Per node buffer size. + */ + public void perNodeBufferSize(int bufSize); + + /** + * Gets maximum number of parallel load operations for a single node. + * + * @return Maximum number of parallel load operations for a single node. + */ + public int perNodeParallelLoadOperations(); + + /** + * Sets maximum number of parallel load operations for a single node. + * <p> + * This method should be called prior to {@link #addData(Object, Object)} call. + * <p> + * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. + * + * @param parallelOps Maximum number of parallel load operations for a single node. + */ + public void perNodeParallelLoadOperations(int parallelOps); + + /** + * Gets automatic flush frequency. Essentially, this is the time after which the + * loader will make an attempt to submit all data added so far to remote nodes. + * Note that there is no guarantee that data will be delivered after this concrete + * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. + * <p> + * If set to {@code 0}, automatic flush is disabled. + * <p> + * Automatic flush is disabled by default (default value is {@code 0}). + * + * @return Flush frequency or {@code 0} if automatic flush is disabled. + * @see #flush() + */ + public long autoFlushFrequency(); + + /** + * Sets automatic flush frequency. Essentially, this is the time after which the + * loader will make an attempt to submit all data added so far to remote nodes. + * Note that there is no guarantee that data will be delivered after this concrete + * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. + * <p> + * If set to {@code 0}, automatic flush is disabled. + * <p> + * Automatic flush is disabled by default (default value is {@code 0}). + * + * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush. + * @see #flush() + */ + public void autoFlushFrequency(long autoFlushFreq); + + /** + * Gets future for this loading process. This future completes whenever method + * {@link #close(boolean)} completes. By attaching listeners to this future + * it is possible to get asynchronous notifications for completion of this + * loading process. + * + * @return Future for this loading process. + */ + public IgniteFuture<?> future(); + + /** + * Optional deploy class for peer deployment. All classes loaded by a data loader + * must be class-loadable from the same class-loader. Ignite will make the best + * effort to detect the most suitable class-loader for data loading. However, + * in complex cases, where compound or deeply nested class-loaders are used, + * it is best to specify a deploy class which can be any class loaded by + * the class-loader for given data. + * + * @param depCls Any class loaded by the class-loader for given data. + */ + public void deployClass(Class<?> depCls); + + /** + * Sets custom cache updater to this data loader. + * + * @param updater Cache updater. + */ + public void updater(Updater<K, V> updater); + + /** + * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}. + * + * @param key Key. + * @return Future fo this operation. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + */ + public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException; + + /** + * Adds data for loading on remote node. This method can be called from multiple + * threads in parallel to speed up loading if needed. + * <p> + * Note that loader will load data concurrently by multiple internal threads, so the + * data may get to remote nodes in different order from which it was added to + * the loader. + * + * @param key Key. + * @param val Value or {@code null} if respective entry must be removed from cache. + * @return Future fo this operation. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + */ + public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException, + IllegalStateException; + + /** + * Adds data for loading on remote node. This method can be called from multiple + * threads in parallel to speed up loading if needed. + * <p> + * Note that loader will load data concurrently by multiple internal threads, so the + * data may get to remote nodes in different order from which it was added to + * the loader. + * + * @param entry Entry. + * @return Future fo this operation. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + */ + public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException, + IllegalStateException; + + /** + * Adds data for loading on remote node. This method can be called from multiple + * threads in parallel to speed up loading if needed. + * <p> + * Note that loader will load data concurrently by multiple internal threads, so the + * data may get to remote nodes in different order from which it was added to + * the loader. + * + * @param entries Collection of entries to be loaded. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + * @return Future for this load operation. + */ + public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; + + /** + * Adds data for loading on remote node. This method can be called from multiple + * threads in parallel to speed up loading if needed. + * <p> + * Note that loader will load data concurrently by multiple internal threads, so the + * data may get to remote nodes in different order from which it was added to + * the loader. + * + * @param entries Map to be loaded. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + * @return Future for this load operation. + */ + public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException; + + /** + * Loads any remaining data, but doesn't close the loader. Data can be still added after + * flush is finished. This method blocks and doesn't allow to add any data until all data + * is loaded. + * <p> + * If another thread is already performing flush, this method will block, wait for + * another thread to complete flush and exit. If you don't want to wait in this case, + * use {@link #tryFlush()} method. + * + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + * @see #tryFlush() + */ + public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException; + + /** + * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush}, + * with the difference that it won't wait and will exit immediately. + * + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IllegalStateException If grid has been concurrently stopped or + * {@link #close(boolean)} has already been called on loader. + * @see #flush() + */ + public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException; + + /** + * Loads any remaining data and closes this loader. + * + * @param cancel {@code True} to cancel ongoing loading operations. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. + */ + public void close(boolean cancel) throws IgniteException, IgniteInterruptedException; + + /** + * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method. + * <p> + * The method is invoked automatically on objects managed by the + * {@code try-with-resources} statement. + * + * @throws IgniteException If failed to close data loader. + * @throws IgniteInterruptedException If thread has been interrupted. + */ + @Override public void close() throws IgniteException, IgniteInterruptedException; + + /** + * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)} + * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best + * performance custom user-defined implementation may help. + * <p> + * Data loader can be configured to use custom implementation of updater instead of default one using + * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method. + */ + interface Updater<K, V> extends Serializable { + /** + * Updates cache with batch of entries. + * + * @param cache Cache. + * @param entries Collection of entries. + * @throws IgniteException If failed. + */ + public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f46d071..336f872 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2346,7 +2346,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ - @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) { + @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) { guard(); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 12ea535..6ed5699 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3877,7 +3877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); if (ctx.store().isLocalStore()) { - IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); + IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); @@ -4043,7 +4043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @throws IgniteCheckedException If failed. */ private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { - try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { ldr.allowOverwrite(true); ldr.skipStore(true); @@ -4086,7 +4086,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); if (ctx.store().isLocalStore()) { - IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); + IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); @@ -6134,7 +6134,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final Collection<Map.Entry<K, V>> col; /** */ - final IgniteDataLoaderImpl<K, V> ldr; + final IgniteDataStreamerImpl<K, V> ldr; /** */ final ExpiryPolicy plc; @@ -6145,7 +6145,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param plc Optional expiry policy. */ private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, - IgniteDataLoaderImpl<K, V> ldr, + IgniteDataStreamerImpl<K, V> ldr, @Nullable ExpiryPolicy plc) { this.p = p; this.ldr = ldr; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 00190d9..c99efc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -276,8 +276,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter else dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; - try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) { - ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0); + try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) { + ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0); dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java index e2e780b..78a7e62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -29,13 +29,13 @@ import java.util.*; */ public class GridDataLoadCacheUpdaters { /** */ - private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual(); + private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); /** */ - private static final IgniteDataLoader.Updater BATCHED = new Batched(); + private static final IgniteDataStreamer.Updater BATCHED = new Batched(); /** */ - private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted(); + private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted(); /** * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and @@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters { * * @return Single updater. */ - public static <K, V> IgniteDataLoader.Updater<K, V> individual() { + public static <K, V> IgniteDataStreamer.Updater<K, V> individual() { return INDIVIDUAL; } @@ -55,7 +55,7 @@ public class GridDataLoadCacheUpdaters { * * @return Batched updater. */ - public static <K, V> IgniteDataLoader.Updater<K, V> batched() { + public static <K, V> IgniteDataStreamer.Updater<K, V> batched() { return BATCHED; } @@ -66,7 +66,7 @@ public class GridDataLoadCacheUpdaters { * * @return Batched sorted updater. */ - public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() { + public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() { return BATCHED_SORTED; } @@ -93,7 +93,7 @@ public class GridDataLoadCacheUpdaters { /** * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. */ - private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> { + private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -120,7 +120,7 @@ public class GridDataLoadCacheUpdaters { /** * Batched updater. Updates cache using batch operations thus is dead lock prone. */ - private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> { + private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -160,7 +160,7 @@ public class GridDataLoadCacheUpdaters { /** * Batched updater. Updates cache using batch operations thus is dead lock prone. */ - private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> { + private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java index 9e2a483..8aa554a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java @@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { private final boolean skipStore; /** */ - private final IgniteDataLoader.Updater<K, V> updater; + private final IgniteDataStreamer.Updater<K, V> updater; /** * @param ctx Context. @@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { Collection<Map.Entry<K, V>> col, boolean ignoreDepOwnership, boolean skipStore, - IgniteDataLoader.Updater<K, V> updater) { + IgniteDataStreamer.Updater<K, V> updater) { this.ctx = ctx; this.log = log; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java index 5efcfe9..dffa862 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java @@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> { /** Data loader. */ @GridToStringExclude - private IgniteDataLoaderImpl dataLdr; + private IgniteDataStreamerImpl dataLdr; /** * Default constructor for {@link Externalizable} support. @@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> { * @param ctx Context. * @param dataLdr Data loader. */ - GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) { + GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { super(ctx); assert dataLdr != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java index d470d02..b29c9ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java @@ -41,7 +41,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; */ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { /** Loaders map (access is not supposed to be highly concurrent). */ - private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>(); + private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); @@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { private Thread flusher; /** */ - private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>(); + private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>(); /** Marshaller. */ private final Marshaller marsh; @@ -80,7 +80,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { @Override protected void body() throws InterruptedException { while (!isCancelled()) { - IgniteDataLoaderImpl<K, V> ldr = flushQ.take(); + IgniteDataStreamerImpl<K, V> ldr = flushQ.take(); if (!busyLock.enterBusy()) return; @@ -118,7 +118,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { U.interrupt(flusher); U.join(flusher, log); - for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) { + for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) { if (log.isDebugEnabled()) log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); @@ -142,12 +142,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { * @param compact {@code true} if data loader should transfer data in compact format. * @return Data loader. */ - public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) { + public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to create data loader (grid is stopping)."); try { - final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact); + final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact); ldrs.add(ldr); @@ -173,7 +173,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { * @param cacheName Cache name ({@code null} for default cache). * @return Data loader. */ - public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) { + public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) { return dataLoader(cacheName, true); } @@ -234,7 +234,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { } Collection<Map.Entry<K, V>> col; - IgniteDataLoader.Updater<K, V> updater; + IgniteDataStreamer.Updater<K, V> updater; try { col = marsh.unmarshal(req.collectionBytes(), clsLdr);