# ignite-394: renaming at IgniteDataStreamer
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/82996bc2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/82996bc2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/82996bc2 Branch: refs/heads/sprint-2 Commit: 82996bc2688e989fcf4b0056ae4355c16b173ad8 Parents: a8f1738 Author: Artem Shutak <ashu...@gridgain.com> Authored: Wed Mar 4 19:56:10 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Wed Mar 4 19:56:10 2015 +0300 ---------------------------------------------------------------------- .../datagrid/CacheDataStreamerExample.java | 2 +- .../org/apache/ignite/IgniteDataStreamer.java | 122 +++++++++---------- .../datastream/IgniteDataStreamerImpl.java | 4 +- .../processors/igfs/IgfsDataManager.java | 2 +- ...idCachePartitionedHitsAndMissesSelfTest.java | 2 +- 5 files changed, 66 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82996bc2/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java index 1193f27..73a36a6 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java @@ -66,7 +66,7 @@ public class CacheDataStreamerExample { try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { // Configure loader. stmr.perNodeBufferSize(1024); - stmr.perNodeParallelLoadOperations(8); + stmr.perNodeParallelStreamOperations(8); for (int i = 0; i < ENTRY_COUNT; i++) { stmr.addData(i, Integer.toString(i)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82996bc2/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 2e54225..c7758fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -24,22 +24,22 @@ import java.io.*; import java.util.*; /** - * Data streamer is responsible for loading external data into cache. It achieves it by + * Data streamer is responsible for streaming external data into cache. It achieves it by * properly buffering updates and properly mapping keys to nodes responsible for the data * to make sure that there is the least amount of data movement possible and optimal * network and memory utilization. * <p> - * Note that loader will load data concurrently by multiple internal threads, so the + * Note that streamer will stream data concurrently by multiple internal threads, so the * data may get to remote nodes in different order from which it was added to - * the loader. + * the streamer. * <p> - * Also note that {@code IgniteDataStreamer} is not the only way to load data into cache. + * Also note that {@code IgniteDataStreamer} is not the only way to add data into cache. * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)} - * method to load data from underlying data store. You can also use standard + * method to add data from underlying data store. You can also use standard * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most - * likely will not perform as well as this class for loading data. And finally, - * data can be loaded from underlying data store on demand, whenever it is accessed - - * for this no explicit data loading step is needed. + * likely will not perform as well as this class for adding data. And finally, + * data can be added from underlying data store on demand, whenever it is accessed - + * for this no explicit data adding step is needed. * <p> * {@code IgniteDataStreamer} supports the following configuration properties: * <ul> @@ -52,19 +52,19 @@ import java.util.*; * value. * </li> * <li> - * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added + * {@link #perNodeParallelStreamOperations(int)} - sometimes data may be added * to the data streamer via {@link #addData(Object, Object)} method faster than it can - * be put in cache. In this case, new buffered load messages are sent to remote nodes + * be put in cache. In this case, new buffered stream messages are sent to remote nodes * before responses from previous ones are received. This could cause unlimited heap * memory utilization growth on local and remote nodes. To control memory utilization, - * this setting limits maximum allowed number of parallel buffered load messages that + * this setting limits maximum allowed number of parallel buffered stream messages that * are being processed on remote nodes. If this number is exceeded, then * {@link #addData(Object, Object)} method will block to control memory utilization. * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value. * </li> * <li> * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially, - * this is the time after which the loader will make an attempt to submit all data + * this is the time after which the streamer will make an attempt to submit all data * added so far to remote nodes. Note that there is no guarantee that data will be * delivered after this concrete attempt (e.g., it can fail when topology is * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}). @@ -74,12 +74,12 @@ import java.util.*; * updates and allow data streamer choose most optimal concurrent implementation. * </li> * <li> - * {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries. + * {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with added entries. * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. * </li> * <li> * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes - * loaded by a data streamer must be class-loadable from the same class-loader. + * streamed by a data streamer must be class-loadable from the same class-loader. * Ignite will make the best effort to detect the most suitable class-loader * for data loading. However, in complex cases, where compound or deeply nested * class-loaders are used, it is best to specify a deploy class which can be any @@ -95,7 +95,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; /** - * Name of cache to load data to. + * Name of cache to stream data to. * * @return Cache name or {@code null} for default cache. */ @@ -120,7 +120,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void allowOverwrite(boolean allowOverwrite) throws IgniteException; /** - * Gets flag indicating that write-through behavior should be disabled for data loading. + * Gets flag indicating that write-through behavior should be disabled for data streaming. * Default is {@code false}. * * @return Skip store flag. @@ -128,7 +128,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public boolean skipStore(); /** - * Sets flag indicating that write-through behavior should be disabled for data loading. + * Sets flag indicating that write-through behavior should be disabled for data streaming. * Default is {@code false}. * * @param skipStore Skip store flag. @@ -154,26 +154,26 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void perNodeBufferSize(int bufSize); /** - * Gets maximum number of parallel load operations for a single node. + * Gets maximum number of parallel stream operations for a single node. * - * @return Maximum number of parallel load operations for a single node. + * @return Maximum number of parallel stream operations for a single node. */ - public int perNodeParallelLoadOperations(); + public int perNodeParallelStreamOperations(); /** - * Sets maximum number of parallel load operations for a single node. + * Sets maximum number of parallel stream operations for a single node. * <p> * This method should be called prior to {@link #addData(Object, Object)} call. * <p> * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. * - * @param parallelOps Maximum number of parallel load operations for a single node. + * @param parallelOps Maximum number of parallel stream operations for a single node. */ - public void perNodeParallelLoadOperations(int parallelOps); + public void perNodeParallelStreamOperations(int parallelOps); /** * Gets automatic flush frequency. Essentially, this is the time after which the - * loader will make an attempt to submit all data added so far to remote nodes. + * streamer will make an attempt to submit all data added so far to remote nodes. * Note that there is no guarantee that data will be delivered after this concrete * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. * <p> @@ -188,7 +188,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { /** * Sets automatic flush frequency. Essentially, this is the time after which the - * loader will make an attempt to submit all data added so far to remote nodes. + * streamer will make an attempt to submit all data added so far to remote nodes. * Note that there is no guarantee that data will be delivered after this concrete * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway. * <p> @@ -202,17 +202,17 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void autoFlushFrequency(long autoFlushFreq); /** - * Gets future for this loading process. This future completes whenever method + * Gets future for this streaming process. This future completes whenever method * {@link #close(boolean)} completes. By attaching listeners to this future * it is possible to get asynchronous notifications for completion of this - * loading process. + * streaming process. * - * @return Future for this loading process. + * @return Future for this streaming process. */ public IgniteFuture<?> future(); /** - * Optional deploy class for peer deployment. All classes loaded by a data streamer + * Optional deploy class for peer deployment. All classes added by a data streamer * must be class-loadable from the same class-loader. Ignite will make the best * effort to detect the most suitable class-loader for data loading. However, * in complex cases, where compound or deeply nested class-loaders are used, @@ -238,17 +238,17 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. + * {@link #close(boolean)} has already been called on streamer. */ public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. + * Adds data for streaming on remote node. This method can be called from multiple + * threads in parallel to speed up streaming if needed. * <p> - * Note that loader will load data concurrently by multiple internal threads, so the + * Note that streamer will stream data concurrently by multiple internal threads, so the * data may get to remote nodes in different order from which it was added to - * the loader. + * the streamer. * <p> * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * then data streamer will not overwrite existing cache entries for better performance @@ -260,19 +260,19 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. + * {@link #close(boolean)} has already been called on streamer. * @see #allowOverwrite() */ public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. + * Adds data for streaming on remote node. This method can be called from multiple + * threads in parallel to speed up streaming if needed. * <p> - * Note that loader will load data concurrently by multiple internal threads, so the + * Note that streamer will stream data concurrently by multiple internal threads, so the * data may get to remote nodes in different order from which it was added to - * the loader. + * the streamer. * <p> * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * then data streamer will not overwrite existing cache entries for better performance @@ -283,56 +283,56 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. + * {@link #close(boolean)} has already been called on streamer. * @see #allowOverwrite() */ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. + * Adds data for streaming on remote node. This method can be called from multiple + * threads in parallel to speed up streaming if needed. * <p> - * Note that loader will load data concurrently by multiple internal threads, so the + * Note that streamer will stream data concurrently by multiple internal threads, so the * data may get to remote nodes in different order from which it was added to - * the loader. + * the streamer. * <p> * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * then data streamer will not overwrite existing cache entries for better performance * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true}) * - * @param entries Collection of entries to be loaded. + * @param entries Collection of entries to be streamed. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @return Future for this load operation. + * {@link #close(boolean)} has already been called on streamer. + * @return Future for this stream operation. * @see #allowOverwrite() */ public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; /** - * Adds data for loading on remote node. This method can be called from multiple - * threads in parallel to speed up loading if needed. + * Adds data for streaming on remote node. This method can be called from multiple + * threads in parallel to speed up streaming if needed. * <p> - * Note that loader will load data concurrently by multiple internal threads, so the + * Note that streamer will stream data concurrently by multiple internal threads, so the * data may get to remote nodes in different order from which it was added to - * the loader. + * the streamer. * <p> * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * then data streamer will not overwrite existing cache entries for better performance * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true}) * - * @param entries Map to be loaded. + * @param entries Map to be streamed. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. - * @return Future for this load operation. + * {@link #close(boolean)} has already been called on streamer. + * @return Future for this stream operation. * @see #allowOverwrite() */ public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException; /** - * Loads any remaining data, but doesn't close the loader. Data can be still added after + * Streams any remaining data, but doesn't close the streamer. Data can be still added after * flush is finished. This method blocks and doesn't allow to add any data until all data - * is loaded. + * is streamed. * <p> * If another thread is already performing flush, this method will block, wait for * another thread to complete flush and exit. If you don't want to wait in this case, @@ -341,27 +341,27 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. + * {@link #close(boolean)} has already been called on streamer. * @see #tryFlush() */ public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException; /** - * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush}, + * Makes an attempt to stream remaining data. This method is mostly similar to {@link #flush}, * with the difference that it won't wait and will exit immediately. * * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or - * {@link #close(boolean)} has already been called on loader. + * {@link #close(boolean)} has already been called on streamer. * @see #flush() */ public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException; /** - * Loads any remaining data and closes this loader. + * Streams any remaining data and closes this streamer. * - * @param cancel {@code True} to cancel ongoing loading operations. + * @param cancel {@code True} to cancel ongoing streaming operations. * @throws IgniteException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82996bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java index 47f72ee..faba034 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java @@ -349,12 +349,12 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D } /** {@inheritDoc} */ - @Override public int perNodeParallelLoadOperations() { + @Override public int perNodeParallelStreamOperations() { return parallelOps; } /** {@inheritDoc} */ - @Override public void perNodeParallelLoadOperations(int parallelOps) { + @Override public void perNodeParallelStreamOperations(int parallelOps) { this.parallelOps = parallelOps; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82996bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 6c922bc..bfe5a8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -312,7 +312,7 @@ public class IgfsDataManager extends IgfsManager { ldr.perNodeBufferSize(cfg.getPerNodeBatchSize()); if (cfg.getPerNodeParallelBatchCount() > 0) - ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount()); + ldr.perNodeParallelStreamOperations(cfg.getPerNodeParallelBatchCount()); ldr.updater(IgniteDataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82996bc2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index 683935c..26d50f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -142,7 +142,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac private static void realTimePopulate(final Ignite g) { try (IgniteDataStreamer<Integer, Long> ldr = g.dataStreamer(null)) { // Sets max values to 1 so cache metrics have correct values. - ldr.perNodeParallelLoadOperations(1); + ldr.perNodeParallelStreamOperations(1); // Count closure which increments a count on remote node. ldr.updater(new IncrementingUpdater());