# ignite-394: "Data loader" -> "Data streamer" + fix names of some tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/350ec49d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/350ec49d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/350ec49d Branch: refs/heads/ignite-394 Commit: 350ec49d819eb0ba1fd75604a3665f610b1f4e69 Parents: b33ab6a Author: Artem Shutak <ashu...@gridgain.com> Authored: Tue Mar 3 22:00:13 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Tue Mar 3 22:00:13 2015 +0300 ---------------------------------------------------------------------- .../datagrid/CacheDataLoaderExample.java | 85 -------- .../datagrid/CacheDataStreamerExample.java | 85 ++++++++ .../ignite/examples/CacheExamplesSelfTest.java | 4 +- .../src/main/java/org/apache/ignite/Ignite.java | 4 +- .../org/apache/ignite/IgniteDataStreamer.java | 4 +- .../ignite/internal/GridKernalContext.java | 2 +- .../dataload/IgniteDataStreamerFuture.java | 6 +- .../dataload/IgniteDataStreamerImpl.java | 12 +- .../dataload/IgniteDataStreamerProcessor.java | 6 +- .../internal/processors/dataload/package.html | 2 +- .../processors/streamer/IgniteStreamerImpl.java | 2 +- .../dataload/GridDataLoaderImplSelfTest.java | 214 ------------------- .../dataload/GridDataLoaderPerformanceTest.java | 199 ----------------- .../IgniteDataStreamerImplSelfTest.java | 214 +++++++++++++++++++ .../IgniteDataStreamerPerformanceTest.java | 199 +++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 +- 16 files changed, 520 insertions(+), 520 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java deleted file mode 100644 index 8984fdd..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.datagrid; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; - -/** - * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API. - * {@link IgniteDataStreamer} is a lot more efficient to use than standard - * {@code put(...)} operation as it properly buffers cache requests - * together and properly manages load on remote nodes. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. - * <p> - * Alternatively you can run {@link CacheNodeStartup} in another JVM which will - * start node with {@code examples/config/example-cache.xml} configuration. - */ -public class CacheDataLoaderExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Number of entries to load. */ - private static final int ENTRY_COUNT = 500000; - - /** Heap size required to run this example. */ - public static final int MIN_MEMORY = 512 * 1024 * 1024; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - * @throws IgniteException If example execution failed. - */ - public static void main(String[] args) throws IgniteException { - ExamplesUtils.checkMinMemory(MIN_MEMORY); - - try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { - System.out.println(); - System.out.println(">>> Cache data loader example started."); - - // Clean up caches on all nodes before run. - ignite.jcache(CACHE_NAME).clear(); - - System.out.println(); - System.out.println(">>> Cache clear finished."); - - long start = System.currentTimeMillis(); - - try (IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(CACHE_NAME)) { - // Configure loader. - ldr.perNodeBufferSize(1024); - ldr.perNodeParallelLoadOperations(8); - - for (int i = 0; i < ENTRY_COUNT; i++) { - ldr.addData(i, Integer.toString(i)); - - // Print out progress while loading cache. - if (i > 0 && i % 10000 == 0) - System.out.println("Loaded " + i + " keys."); - } - } - - long end = System.currentTimeMillis(); - - System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java new file mode 100644 index 0000000..fc1ef78 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; + +/** + * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API. + * {@link IgniteDataStreamer} is a lot more efficient to use than standard + * {@code put(...)} operation as it properly buffers cache requests + * together and properly manages load on remote nodes. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * <p> + * Alternatively you can run {@link CacheNodeStartup} in another JVM which will + * start node with {@code examples/config/example-cache.xml} configuration. + */ +public class CacheDataStreamerExample { + /** Cache name. */ + private static final String CACHE_NAME = "partitioned"; + + /** Number of entries to load. */ + private static final int ENTRY_COUNT = 500000; + + /** Heap size required to run this example. */ + public static final int MIN_MEMORY = 512 * 1024 * 1024; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + ExamplesUtils.checkMinMemory(MIN_MEMORY); + + try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { + System.out.println(); + System.out.println(">>> Cache data loader example started."); + + // Clean up caches on all nodes before run. + ignite.jcache(CACHE_NAME).clear(); + + System.out.println(); + System.out.println(">>> Cache clear finished."); + + long start = System.currentTimeMillis(); + + try (IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(CACHE_NAME)) { + // Configure loader. + ldr.perNodeBufferSize(1024); + ldr.perNodeParallelLoadOperations(8); + + for (int i = 0; i < ENTRY_COUNT; i++) { + ldr.addData(i, Integer.toString(i)); + + // Print out progress while loading cache. + if (i > 0 && i % 10000 == 0) + System.out.println("Loaded " + i + " keys."); + } + } + + long end = System.currentTimeMillis(); + + System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index bd82760..c5c4599 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -18,9 +18,9 @@ package org.apache.ignite.examples; import org.apache.ignite.examples.datagrid.*; -import org.apache.ignite.examples.datastructures.*; import org.apache.ignite.examples.datagrid.starschema.*; import org.apache.ignite.examples.datagrid.store.*; +import org.apache.ignite.examples.datastructures.*; import org.apache.ignite.testframework.junits.common.*; /** @@ -115,7 +115,7 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { * @throws Exception If failed. */ public void testCacheDataLoaderExample() throws Exception { - CacheDataLoaderExample.main(EMPTY_ARGS); + CacheDataStreamerExample.main(EMPTY_ARGS); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 343d65d..f13def0 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -203,12 +203,12 @@ public interface Ignite extends AutoCloseable { public IgniteTransactions transactions(); /** - * Gets a new instance of data loader associated with given cache name. Data loader + * Gets a new instance of data loader associated with given cache name. Data streamer * is responsible for loading external data into in-memory data grid. For more information * refer to {@link IgniteDataStreamer} documentation. * * @param cacheName Cache name ({@code null} for default cache). - * @return Data loader. + * @return Data streamer. */ public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 519a0a0..ca9726d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -24,7 +24,7 @@ import java.io.*; import java.util.*; /** - * Data loader is responsible for loading external data into cache. It achieves it by + * Data streamer is responsible for loading external data into cache. It achieves it by * properly buffering updates and properly mapping keys to nodes responsible for the data * to make sure that there is the least amount of data movement possible and optimal * network and memory utilization. @@ -363,7 +363,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best * performance custom user-defined implementation may help. * <p> - * Data loader can be configured to use custom implementation of updater instead of default one using + * Data streamer can be configured to use custom implementation of updater instead of default one using * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method. */ interface Updater<K, V> extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 53bd9d2..6d502bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -245,7 +245,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { /** * Gets data loader processor. * - * @return Data loader processor. + * @return Data streamer processor. */ public <K, V> IgniteDataStreamerProcessor<K, V> dataStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java index e093b37..5730655 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java @@ -26,13 +26,13 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; /** - * Data loader future. + * Data streamer future. */ class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { /** */ private static final long serialVersionUID = 0L; - /** Data loader. */ + /** Data streamer. */ @GridToStringExclude private IgniteDataStreamerImpl dataLdr; @@ -45,7 +45,7 @@ class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { /** * @param ctx Context. - * @param dataLdr Data loader. + * @param dataLdr Data streamer. */ IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { super(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java index 0476e2c..2e7517b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java @@ -52,7 +52,7 @@ import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** - * Data loader implementation. + * Data streamer implementation. */ @SuppressWarnings("unchecked") public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { @@ -253,7 +253,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D */ private void enterBusy() { if (!busyLock.enterBusy()) - throw new IllegalStateException("Data loader has been closed."); + throw new IllegalStateException("Data streamer has been closed."); } /** @@ -520,7 +520,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); if (cancelled) { - resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " + + resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this, e1)); } else if (remaps + 1 > maxRemapCnt) { @@ -887,7 +887,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D submit(entries0, curFut0); if (cancelled) - curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this)); + curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this)); } return curFut0; @@ -1160,7 +1160,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D * */ void cancelAll() { - IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this); + IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this); for (IgniteInternalFuture<?> f : locFuts) { try { @@ -1191,7 +1191,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D } /** - * Data loader peer-deploy aware. + * Data streamer peer-deploy aware. */ private class DataLoaderPda implements GridPeerDeployAware { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java index 3b25d17..c01d451 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java @@ -140,7 +140,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { /** * @param cacheName Cache name ({@code null} for default cache). * @param compact {@code true} if data loader should transfer data in compact format. - * @return Data loader. + * @return Data streamer. */ public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) { if (!busyLock.enterBusy()) @@ -171,7 +171,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { /** * @param cacheName Cache name ({@code null} for default cache). - * @return Data loader. + * @return Data streamer. */ public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) { return dataStreamer(cacheName, true); @@ -310,7 +310,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); - X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']'); X.println(">>> ldrsSize: " + ldrs.size()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html index 47052a3..1090b86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html @@ -19,6 +19,6 @@ <html> <body> <!-- Package description. --> - Data loader processor. + Data streamer processor. </body> </html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java index 10fd3d8..c70f8e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java @@ -1139,7 +1139,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { } /** - * Data loader peer-deploy aware. + * Data streamer peer-deploy aware. */ private class StreamerPda implements GridPeerDeployAware { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java deleted file mode 100644 index 2d1d79d..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Tests for {@code GridDataLoaderImpl}. - */ -public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of keys to load via data loader. */ - private static final int KEYS_COUNT = 1000; - - /** Started grid counter. */ - private static int cnt; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - // Forth node goes without cache. - if (cnt < 4) - cfg.setCacheConfiguration(cacheConfiguration()); - - cnt++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testNullPointerExceptionUponDataLoaderClosing() throws Exception { - try { - startGrids(5); - - final CyclicBarrier barrier = new CyclicBarrier(2); - - multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - U.awaitQuiet(barrier); - - G.stopAll(true); - - return null; - } - }, 1); - - Ignite g4 = grid(4); - - IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); - - dataLdr.perNodeBufferSize(32); - - for (int i = 0; i < 100000; i += 2) { - dataLdr.addData(i, i); - dataLdr.removeData(i + 1); - } - - U.awaitQuiet(barrier); - - info("Closing data loader."); - - try { - dataLdr.close(true); - } - catch (IllegalStateException ignore) { - // This is ok to ignore this exception as test is racy by it's nature - - // grid is stopping in different thread. - } - } - finally { - G.stopAll(true); - } - } - - /** - * Data loader should correctly load entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. - * - * @throws Exception If failed. - */ - public void testAddDataFromMap() throws Exception { - try { - cnt = 0; - - startGrids(2); - - Ignite g0 = grid(0); - - Marshaller marsh = g0.configuration().getMarshaller(); - - if (marsh instanceof OptimizedMarshaller) - assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable()); - else - fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); - - IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); - - Map<Integer, String> map = U.newHashMap(KEYS_COUNT); - - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, String.valueOf(i)); - - dataLdr.addData(map); - - dataLdr.close(); - - Random rnd = new Random(); - - IgniteCache<Integer, String> c = g0.jcache(null); - - for (int i = 0; i < KEYS_COUNT; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - String v = c.get(k); - - assertEquals(k.toString(), v); - } - } - finally { - G.stopAll(true); - } - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(1); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - return cacheCfg; - } - - /** - * - */ - private static class TestObject implements Serializable { - /** */ - private int val; - - /** - */ - private TestObject() { - // No-op. - } - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - public Integer val() { - return val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof TestObject && ((TestObject)obj).val == val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java deleted file mode 100644 index 89a2170..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * Data loader performance test. Compares group lock data loader to traditional lock. - * <p> - * Disable assertions and give at least 2 GB heap to run this test. - */ -public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final int ENTRY_CNT = 80000; - - /** */ - private boolean useCache; - - /** */ - private String[] vals = new String[2048]; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - cfg.setConnectorConfiguration(null); - - cfg.setPeerClassLoadingEnabled(true); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(PARTITIONED); - - cc.setDistributionMode(PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setStartSize(ENTRY_CNT / GRID_CNT); - cc.setSwapEnabled(false); - - cc.setBackups(1); - - cc.setStoreValueBytes(true); - - cfg.setCacheSanityCheckEnabled(false); - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - for (int i = 0; i < vals.length; i++) { - int valLen = ThreadLocalRandom8.current().nextInt(128, 512); - - StringBuilder sb = new StringBuilder(); - - for (int j = 0; j < valLen; j++) - sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); - - vals[i] = sb.toString(); - - info("Value: " + vals[i]); - } - } - - /** - * @throws Exception If failed. - */ - public void testPerformance() throws Exception { - doTest(); - } - - /** - * @throws Exception If failed. - */ - private void doTest() throws Exception { - System.gc(); - System.gc(); - System.gc(); - - try { - useCache = true; - - startGridsMultiThreaded(GRID_CNT); - - useCache = false; - - Ignite ignite = startGrid(); - - final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); - - ldr.perNodeBufferSize(8192); - ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted()); - ldr.autoFlushFrequency(0); - - final LongAdder cnt = new LongAdder(); - - long start = U.currentTimeMillis(); - - Thread t = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - while (true) { - try { - Thread.sleep(10000); - } - catch (InterruptedException ignored) { - break; - } - - info(">>> Adds/sec: " + cnt.sumThenReset() / 10); - } - } - }); - - t.setDaemon(true); - - t.start(); - - int threadNum = 2;//Runtime.getRuntime().availableProcessors(); - - multithreaded(new Callable<Object>() { - @SuppressWarnings("InfiniteLoopStatement") - @Override public Object call() throws Exception { - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); - - while (true) { - int i = rnd.nextInt(ENTRY_CNT); - - ldr.addData(i, vals[rnd.nextInt(vals.length)]); - - cnt.increment(); - } - } - }, threadNum, "loader"); - - info("Closing loader..."); - - ldr.close(false); - - long duration = U.currentTimeMillis() - start; - - info("Finished performance test. Duration: " + duration + "ms."); - } - finally { - stopAllGrids(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java new file mode 100644 index 0000000..c84d9db --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests for {@code GridDataLoaderImpl}. + */ +public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of keys to load via data loader. */ + private static final int KEYS_COUNT = 1000; + + /** Started grid counter. */ + private static int cnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + // Forth node goes without cache. + if (cnt < 4) + cfg.setCacheConfiguration(cacheConfiguration()); + + cnt++; + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNullPointerExceptionUponDataLoaderClosing() throws Exception { + try { + startGrids(5); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.awaitQuiet(barrier); + + G.stopAll(true); + + return null; + } + }, 1); + + Ignite g4 = grid(4); + + IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null); + + dataLdr.perNodeBufferSize(32); + + for (int i = 0; i < 100000; i += 2) { + dataLdr.addData(i, i); + dataLdr.removeData(i + 1); + } + + U.awaitQuiet(barrier); + + info("Closing data loader."); + + try { + dataLdr.close(true); + } + catch (IllegalStateException ignore) { + // This is ok to ignore this exception as test is racy by it's nature - + // grid is stopping in different thread. + } + } + finally { + G.stopAll(true); + } + } + + /** + * Data streamer should correctly load entries from HashMap in case of grids with more than one node + * and with GridOptimizedMarshaller that requires serializable. + * + * @throws Exception If failed. + */ + public void testAddDataFromMap() throws Exception { + try { + cnt = 0; + + startGrids(2); + + Ignite g0 = grid(0); + + Marshaller marsh = g0.configuration().getMarshaller(); + + if (marsh instanceof OptimizedMarshaller) + assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable()); + else + fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); + + IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); + + Map<Integer, String> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, String.valueOf(i)); + + dataLdr.addData(map); + + dataLdr.close(); + + Random rnd = new Random(); + + IgniteCache<Integer, String> c = g0.jcache(null); + + for (int i = 0; i < KEYS_COUNT; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + String v = c.get(k); + + assertEquals(k.toString(), v); + } + } + finally { + G.stopAll(true); + } + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + return cacheCfg; + } + + /** + * + */ + private static class TestObject implements Serializable { + /** */ + private int val; + + /** + */ + private TestObject() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + public Integer val() { + return val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof TestObject && ((TestObject)obj).val == val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java new file mode 100644 index 0000000..5f18df8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * Data streamer performance test. Compares group lock data loader to traditional lock. + * <p> + * Disable assertions and give at least 2 GB heap to run this test. + */ +public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int ENTRY_CNT = 80000; + + /** */ + private boolean useCache; + + /** */ + private String[] vals = new String[2048]; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setConnectorConfiguration(null); + + cfg.setPeerClassLoadingEnabled(true); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setStartSize(ENTRY_CNT / GRID_CNT); + cc.setSwapEnabled(false); + + cc.setBackups(1); + + cc.setStoreValueBytes(true); + + cfg.setCacheSanityCheckEnabled(false); + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < vals.length; i++) { + int valLen = ThreadLocalRandom8.current().nextInt(128, 512); + + StringBuilder sb = new StringBuilder(); + + for (int j = 0; j < valLen; j++) + sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); + + vals[i] = sb.toString(); + + info("Value: " + vals[i]); + } + } + + /** + * @throws Exception If failed. + */ + public void testPerformance() throws Exception { + doTest(); + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + System.gc(); + System.gc(); + System.gc(); + + try { + useCache = true; + + startGridsMultiThreaded(GRID_CNT); + + useCache = false; + + Ignite ignite = startGrid(); + + final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null); + + ldr.perNodeBufferSize(8192); + ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted()); + ldr.autoFlushFrequency(0); + + final LongAdder cnt = new LongAdder(); + + long start = U.currentTimeMillis(); + + Thread t = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + while (true) { + try { + Thread.sleep(10000); + } + catch (InterruptedException ignored) { + break; + } + + info(">>> Adds/sec: " + cnt.sumThenReset() / 10); + } + } + }); + + t.setDaemon(true); + + t.start(); + + int threadNum = 2;//Runtime.getRuntime().availableProcessors(); + + multithreaded(new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + while (true) { + int i = rnd.nextInt(ENTRY_CNT); + + ldr.addData(i, vals[rnd.nextInt(vals.length)]); + + cnt.increment(); + } + } + }, threadNum, "loader"); + + info("Closing loader..."); + + ldr.close(false); + + long duration = U.currentTimeMillis() - start; + + info("Finished performance test. Duration: " + duration + "ms."); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index a30eea5..30285a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -110,7 +110,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); suite.addTestSuite(IgniteDataStreamerProcessorSelfTest.class); - suite.addTestSuite(GridDataLoaderImplSelfTest.class); + suite.addTestSuite(IgniteDataStreamerImplSelfTest.class); suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class); suite.addTestSuite(GridCacheClearAllSelfTest.class); suite.addTestSuite(GridCacheObjectToStringSelfTest.class);