http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java deleted file mode 100644 index 564c876..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java +++ /dev/null @@ -1,1079 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.IgniteEventType.*; -import static org.gridgain.grid.kernal.processors.continuous.GridContinuousProcessor.*; - -/** - * Event consume test. - */ -public class GridEventConsumeSelfTest extends GridCommonAbstractTest { - /** */ - private static final String PRJ_PRED_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeProjectionPredicate"; - - /** */ - private static final String FILTER_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeFilter"; - - /** Grids count. */ - private static final int GRID_CNT = 3; - - /** Number of created consumes per thread in multithreaded test. */ - private static final int CONSUME_CNT = 500; - - /** Consume latch. */ - private static volatile CountDownLatch consumeLatch; - - /** Consume counter. */ - private static volatile AtomicInteger consumeCnt; - - /** Include node flag. */ - private boolean include; - - /** No automatic unsubscribe flag. */ - private boolean noAutoUnsubscribe; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (include) - cfg.setUserAttributes(F.asMap("include", true)); - - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - assertTrue(GRID_CNT > 1); - - include = true; - - startGridsMultiThreaded(GRID_CNT - 1); - - include = false; - - startGrid(GRID_CNT - 1); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - assertEquals(GRID_CNT, grid(0).nodes().size()); - - for (int i = 0; i < GRID_CNT; i++) { - GridKernal grid = (GridKernal)grid(i); - - GridContinuousProcessor proc = grid.context().continuous(); - - if (noAutoUnsubscribe) { - localRoutines(proc).clear(); - - U.<Map>field(proc, "rmtInfos").clear(); - } - - assertEquals(0, localRoutines(proc).size()); - assertEquals(0, U.<Map>field(proc, "rmtInfos").size()); - assertEquals(0, U.<Map>field(proc, "startFuts").size()); - assertEquals(0, U.<Map>field(proc, "waitForStartAck").size()); - assertEquals(0, U.<Map>field(proc, "stopFuts").size()); - assertEquals(0, U.<Map>field(proc, "waitForStopAck").size()); - assertEquals(0, U.<Map>field(proc, "pending").size()); - } - } - - /** - * @param proc Continuous processor. - * @return Local event routines. - */ - private Collection<LocalRoutineInfo> localRoutines(GridContinuousProcessor proc) { - return F.view(U.<Map<UUID, LocalRoutineInfo>>field(proc, "locInfos").values(), - new IgnitePredicate<LocalRoutineInfo>() { - @Override public boolean apply(LocalRoutineInfo info) { - return info.handler().isForEvents(); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testApi() throws Exception { - try { - grid(0).events().stopRemoteListen(null); - } - catch (NullPointerException ignored) { - // No-op. - } - - grid(0).events().stopRemoteListen(UUID.randomUUID()); - - UUID consumeId = null; - - try { - consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteDiscoveryEvent>() { - @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) { - return false; - } - }, - new P1<IgniteDiscoveryEvent>() { - @Override public boolean apply(IgniteDiscoveryEvent e) { - return false; - } - }, - EVTS_DISCOVERY - ); - - assertNotNull(consumeId); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - - try { - consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteDiscoveryEvent>() { - @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) { - return false; - } - }, - new P1<IgniteDiscoveryEvent>() { - @Override public boolean apply(IgniteDiscoveryEvent e) { - return false; - } - } - ); - - assertNotNull(consumeId); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - - try { - consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID uuid, IgniteEvent evt) { - return false; - } - }, - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent e) { - return false; - } - } - ); - - assertNotNull(consumeId); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testAllEvents() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - if (evt.type() == EVT_JOB_STARTED) { - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - } - - return true; - } - }, - null - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testEventsByType() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testEventsByFilter() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - return evt.type() == EVT_JOB_STARTED; - } - } - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testEventsByTypeAndFilter() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteJobEvent>() { - @Override public boolean apply(UUID nodeId, IgniteJobEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - new P1<IgniteJobEvent>() { - @Override public boolean apply(IgniteJobEvent evt) { - return !"exclude".equals(evt.taskName()); - } - }, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - grid(0).compute().withName("exclude").run(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testRemoteProjection() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); - - UUID consumeId = events(grid(0).forRemotes()).remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT - 1, nodeIds.size()); - assertEquals(GRID_CNT - 1, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testProjectionWithLocalNode() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); - - UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT - 1, nodeIds.size()); - assertEquals(GRID_CNT - 1, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testLocalNodeOnly() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - - UUID consumeId = events(grid(0).forLocal()).remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(1, nodeIds.size()); - assertEquals(1, cnt.get()); - - assertEquals(grid(0).localNode().id(), F.first(nodeIds)); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testEmptyProjection() throws Exception { - try { - events(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())).remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - return true; - } - }, - null - ); - - assert false : "Exception was not thrown."; - } - catch (IgniteCheckedException e) { - assertTrue(e.getMessage().startsWith( - "Failed to register remote continuous listener (projection is empty).")); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopByCallback() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return false; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(1, nodeIds.size()); - assertEquals(1, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopRemoteListen() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().run(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(1, nodeIds.size()); - assertEquals(1, cnt.get()); - - grid(0).events().stopRemoteListen(consumeId); - - grid(0).compute().run(F.noop()); - - U.sleep(500); - - assertEquals(1, nodeIds.size()); - assertEquals(1, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopLocalListenByCallback() throws Exception { - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - - grid(0).events().localListen( - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - info("Local event [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - cnt.incrementAndGet(); - latch.countDown(); - - return false; - } - }, - EVT_JOB_STARTED); - - compute(grid(0).forLocal()).run(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(1, cnt.get()); - - compute(grid(0).forLocal()).run(F.noop()); - - U.sleep(500); - - assertEquals(1, cnt.get()); - } - - /** - * @throws Exception If failed. - */ - public void testNodeJoin() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - return evt.type() == EVT_JOB_STARTED; - } - }, - EVT_JOB_STARTED, EVT_JOB_FINISHED - ); - - try { - assertNotNull(consumeId); - - include = true; - - startGrid("anotherGrid"); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT + 1, nodeIds.size()); - assertEquals(GRID_CNT + 1, cnt.get()); - } - finally { - stopGrid("anotherGrid"); - - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testNodeJoinWithProjection() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen( - new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - null, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - include = true; - - startGrid("anotherGrid1"); - - include = false; - - startGrid("anotherGrid2"); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - stopGrid("anotherGrid1"); - stopGrid("anotherGrid2"); - - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - // TODO: GG-6730 - public void _testNodeJoinWithP2P() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1); - - ClassLoader ldr = getExternalClassLoader(); - - IgnitePredicate<ClusterNode> prjPred = (IgnitePredicate<ClusterNode>)ldr.loadClass(PRJ_PRED_CLS_NAME).newInstance(); - IgnitePredicate<IgniteEvent> filter = (IgnitePredicate<IgniteEvent>)ldr.loadClass(FILTER_CLS_NAME).newInstance(); - - UUID consumeId = events(grid(0).forPredicate(prjPred)).remoteListen(new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, filter, EVT_JOB_STARTED); - - try { - assertNotNull(consumeId); - - startGrid("anotherGrid"); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT + 1, nodeIds.size()); - assertEquals(GRID_CNT + 1, cnt.get()); - } - finally { - stopGrid("anotherGrid1"); - stopGrid("anotherGrid2"); - - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testResources() throws Exception { - final Collection<UUID> nodeIds = new HashSet<>(); - final AtomicInteger cnt = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - UUID consumeId = grid(0).events().remoteListen( - new P2<UUID, IgniteEvent>() { - @IgniteInstanceResource - private Ignite grid; - - @Override public boolean apply(UUID nodeId, IgniteEvent evt) { - info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); - - assertEquals(EVT_JOB_STARTED, evt.type()); - assertNotNull(grid); - - nodeIds.add(nodeId); - cnt.incrementAndGet(); - latch.countDown(); - - return true; - } - }, - new P1<IgniteEvent>() { - @IgniteInstanceResource - private Ignite grid; - - @Override public boolean apply(IgniteEvent evt) { - assertNotNull(grid); - - return true; - } - }, - EVT_JOB_STARTED - ); - - try { - assertNotNull(consumeId); - - grid(0).compute().broadcast(F.noop()); - - assert latch.await(2, SECONDS); - - assertEquals(GRID_CNT, nodeIds.size()); - assertEquals(GRID_CNT, cnt.get()); - } - finally { - grid(0).events().stopRemoteListen(consumeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testMasterNodeLeave() throws Exception { - Ignite g = startGrid("anotherGrid"); - - final UUID nodeId = g.cluster().localNode().id(); - final CountDownLatch latch = new CountDownLatch(GRID_CNT); - - for (int i = 0; i < GRID_CNT; i++) { - grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id())) - latch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - - g.events().remoteListen( - null, - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - return true; - } - }, - EVTS_ALL - ); - - stopGrid("anotherGrid"); - - latch.await(); - } - - /** - * @throws Exception If failed. - */ - public void testMasterNodeLeaveNoAutoUnsubscribe() throws Exception { - Ignite g = startGrid("anotherGrid"); - - final UUID nodeId = g.cluster().localNode().id(); - final CountDownLatch discoLatch = new CountDownLatch(GRID_CNT); - - for (int i = 0; i < GRID_CNT; i++) { - grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id())) - discoLatch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - - consumeLatch = new CountDownLatch(GRID_CNT * 2 + 1); - consumeCnt = new AtomicInteger(); - - noAutoUnsubscribe = true; - - g.events().remoteListen( - 1, 0, false, - null, - new P1<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - consumeLatch.countDown(); - consumeCnt.incrementAndGet(); - - return true; - } - }, - EVT_JOB_STARTED - ); - - grid(0).compute().broadcast(F.noop()); - - stopGrid("anotherGrid"); - - discoLatch.await(); - - grid(0).compute().broadcast(F.noop()); - - assert consumeLatch.await(2, SECONDS); - - assertEquals(GRID_CNT * 2 + 1, consumeCnt.get()); - } - - /** - * @throws Exception If failed. - */ - public void testMultithreadedWithNodeRestart() throws Exception { - final AtomicBoolean stop = new AtomicBoolean(); - final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new LinkedBlockingQueue<>(); - final Collection<UUID> started = new GridConcurrentHashSet<>(); - final Collection<UUID> stopped = new GridConcurrentHashSet<>(); - - final Random rnd = new Random(); - - IgniteFuture<?> starterFut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < CONSUME_CNT; i++) { - int idx = rnd.nextInt(GRID_CNT); - - try { - IgniteEvents evts = grid(idx).events().enableAsync(); - - evts.remoteListen(new P2<UUID, IgniteEvent>() { - @Override public boolean apply(UUID uuid, IgniteEvent evt) { - return true; - } - }, null, EVT_JOB_STARTED); - - UUID consumeId = evts.<UUID>future().get(3000); - - started.add(consumeId); - - queue.add(F.t(idx, consumeId)); - } - catch (ClusterTopologyException ignored) { - // No-op. - } - - U.sleep(10); - } - - stop.set(true); - - return null; - } - }, 8, "consume-starter"); - - IgniteFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!stop.get()) { - IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS); - - if (t == null) - continue; - - int idx = t.get1(); - UUID consumeId = t.get2(); - - try { - IgniteEvents evts = grid(idx).events().enableAsync(); - - evts.stopRemoteListen(consumeId); - - evts.future().get(3000); - - stopped.add(consumeId); - } - catch (ClusterTopologyException ignored) { - // No-op. - } - } - - return null; - } - }, 4, "consume-stopper"); - - IgniteFuture<?> nodeRestarterFut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!stop.get()) { - startGrid("anotherGrid"); - stopGrid("anotherGrid"); - } - - return null; - } - }, 1, "node-restarter"); - - IgniteFuture<?> jobRunnerFut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!stop.get()) { - int idx = rnd.nextInt(GRID_CNT); - - try { - IgniteCompute comp = grid(idx).compute().enableAsync(); - - comp.run(F.noop()); - - comp.future().get(3000); - } - catch (IgniteCheckedException ignored) { - // Ignore all job execution related errors. - } - } - - return null; - } - }, 1, "job-runner"); - - starterFut.get(); - stopperFut.get(); - nodeRestarterFut.get(); - jobRunnerFut.get(); - - IgniteBiTuple<Integer, UUID> t; - - while ((t = queue.poll()) != null) { - int idx = t.get1(); - UUID consumeId = t.get2(); - - IgniteEvents evts = grid(idx).events().enableAsync(); - - evts.stopRemoteListen(consumeId); - - evts.future().get(3000); - - stopped.add(consumeId); - } - - Collection<UUID> notStopped = F.lose(started, true, stopped); - - assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java deleted file mode 100644 index 5b986e1..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java +++ /dev/null @@ -1,489 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.messaging.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Message listen test. - */ -public class GridMessageListenSelfTest extends GridCommonAbstractTest { - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final String INC_ATTR = "include"; - - /** */ - private static final String MSG = "Message"; - - /** */ - private static final String TOPIC = "Topic"; - - /** */ - private static final int MSG_CNT = 3; - - /** */ - private static final String TOPIC_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageTopic"; - - /** */ - private static final String LSNR_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageListener"; - - /** */ - private static boolean include; - - /** */ - private static final List<UUID> allNodes = new ArrayList<>(); - - /** */ - private static final List<UUID> rmtNodes = new ArrayList<>(); - - /** */ - private static final List<UUID> incNodes = new ArrayList<>(); - - /** */ - private static final Collection<UUID> nodes = new GridConcurrentHashSet<>(); - - /** */ - private static final AtomicInteger cnt = new AtomicInteger(); - - /** */ - private static CountDownLatch latch; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (include) - cfg.setUserAttributes(F.asMap(INC_ATTR, true)); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - nodes.clear(); - cnt.set(0); - - include = true; - - startGridsMultiThreaded(GRID_CNT - 1); - - include = false; - - Thread.sleep(500); - - startGrid(GRID_CNT - 1); - - allNodes.clear(); - rmtNodes.clear(); - incNodes.clear(); - - for (int i = 0; i < GRID_CNT; i++) { - UUID id = grid(i).localNode().id(); - - allNodes.add(id); - - if (i != 0) - rmtNodes.add(id); - - if (i != GRID_CNT - 1) - incNodes.add(id); - } - - Collections.sort(allNodes); - Collections.sort(rmtNodes); - Collections.sort(incNodes); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testNullTopic() throws Exception { - latch = new CountDownLatch(MSG_CNT * GRID_CNT); - - listen(grid(0), null, true); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * GRID_CNT, cnt.get()); - - checkNodes(allNodes); - } - - /** - * @throws Exception If failed. - */ - public void testNonNullTopic() throws Exception { - latch = new CountDownLatch(MSG_CNT * GRID_CNT); - - listen(grid(0), null, true); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * GRID_CNT, cnt.get()); - - checkNodes(allNodes); - } - - /** - * @throws Exception If failed. - */ - public void testStopListen() throws Exception { - latch = new CountDownLatch(GRID_CNT); - - listen(grid(0), null, false); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - int expCnt = cnt.get(); - - send(); - - Thread.sleep(1000); - - assertEquals(expCnt, cnt.get()); - - checkNodes(allNodes); - } - - /** - * @throws Exception If failed. - */ - public void testProjection() throws Exception { - latch = new CountDownLatch(MSG_CNT * (GRID_CNT - 1)); - - listen(grid(0).forRemotes(), null, true); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * (GRID_CNT - 1), cnt.get()); - - checkNodes(rmtNodes); - } - - /** - * @throws Exception If failed. - */ - public void testNodeJoin() throws Exception { - latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1)); - - listen(grid(0), null, true); - - try { - Ignite g = startGrid("anotherGrid"); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get()); - - List<UUID> allNodes0 = new ArrayList<>(allNodes); - - allNodes0.add(g.cluster().localNode().id()); - - Collections.sort(allNodes0); - - checkNodes(allNodes0); - } - finally { - stopGrid("anotherGrid"); - } - } - - /** - * @throws Exception If failed. - */ - public void testNodeJoinWithProjection() throws Exception { - latch = new CountDownLatch(MSG_CNT * GRID_CNT); - - listen(grid(0).forAttribute(INC_ATTR, null), null, true); - - try { - include = true; - - Ignite g = startGrid("anotherGrid1"); - - include = false; - - startGrid("anotherGrid2"); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * GRID_CNT, cnt.get()); - - List<UUID> incNodes0 = new ArrayList<>(incNodes); - - incNodes0.add(g.cluster().localNode().id()); - - Collections.sort(incNodes0); - - checkNodes(incNodes0); - } - finally { - stopGrid("anotherGrid1"); - stopGrid("anotherGrid2"); - } - } - - /** - * @throws Exception If failed. - */ - public void testNullTopicWithDeployment() throws Exception { - Class<?> cls = getExternalClassLoader().loadClass(LSNR_CLS_NAME); - - grid(0).message().remoteListen(null, (IgniteBiPredicate<UUID, Object>)cls.newInstance()); - - send(); - - boolean s = GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkDeployedListeners(GRID_CNT); - } - }, 2000); - - assertTrue(s); - } - - /** - * @throws Exception If failed. - */ - public void testNonNullTopicWithDeployment() throws Exception { - ClassLoader ldr = getExternalClassLoader(); - - Class<?> topicCls = ldr.loadClass(TOPIC_CLS_NAME); - Class<?> lsnrCls = ldr.loadClass(LSNR_CLS_NAME); - - Object topic = topicCls.newInstance(); - - grid(0).message().remoteListen(topic, (IgniteBiPredicate<UUID, Object>)lsnrCls.newInstance()); - - send(topic); - - boolean s = GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkDeployedListeners(GRID_CNT); - } - }, 2000); - - assertTrue(s); - } - - /** - * @throws Exception If failed. - */ - public void testListenActor() throws Exception { - latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1)); - - grid(0).message().remoteListen(null, new Actor(grid(0))); - - try { - Ignite g = startGrid("anotherGrid"); - - send(); - - assert latch.await(2, SECONDS); - - Thread.sleep(500); - - assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get()); - - List<UUID> allNodes0 = new ArrayList<>(allNodes); - - allNodes0.add(g.cluster().localNode().id()); - - Collections.sort(allNodes0); - - checkNodes(allNodes0); - } - finally { - stopGrid("anotherGrid"); - } - } - - /** - * @param prj Projection. - * @param topic Topic. - * @param ret Value returned from listener. - * @throws Exception In case of error. - */ - private void listen(final ClusterGroup prj, @Nullable Object topic, final boolean ret) throws Exception { - assert prj != null; - - message(prj).remoteListen(topic, new Listener(prj, ret)); - } - - /** - * @throws Exception In case of error. - */ - private void send() throws Exception { - send(TOPIC); - } - - /** - * @param topic Non-null topic. - * @throws Exception In case of error. - */ - private void send(Object topic) throws Exception { - assert topic != null; - - for (int i = 0; i < MSG_CNT; i++) - grid(0).message().send(null, MSG); - - for (int i = 0; i < MSG_CNT; i++) - grid(0).message().send(topic, MSG); - } - - /** - * @param expCnt Expected messages count. - * @return If check passed. - */ - private boolean checkDeployedListeners(int expCnt) { - for (Ignite g : G.allGrids()) { - AtomicInteger cnt = g.cluster().<String, AtomicInteger>nodeLocalMap().get("msgCnt"); - - if (cnt == null || cnt.get() != expCnt) - return false; - } - - return true; - } - - /** - * @param expNodes Expected nodes. - */ - private void checkNodes(List<UUID> expNodes) { - List<UUID> nodes0 = new ArrayList<>(nodes); - - Collections.sort(nodes0); - - assertEquals(expNodes, nodes0); - } - - /** */ - private static class Listener implements P2<UUID, Object> { - /** */ - private final ClusterGroup prj; - - /** */ - private final boolean ret; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * @param prj Projection. - * @param ret Return value. - */ - private Listener(ClusterGroup prj, boolean ret) { - this.prj = prj; - this.ret = ret; - } - - /** {@inheritDoc} */ - @Override public boolean apply(UUID nodeId, Object msg) { - assertNotNull(ignite); - assertNotNull(ignite.configuration().getNodeId()); - - X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + ignite.cluster().localNode().id() + ']'); - - assertEquals(prj.ignite().cluster().localNode().id(), nodeId); - assertEquals(MSG, msg); - - nodes.add(ignite.configuration().getNodeId()); - cnt.incrementAndGet(); - latch.countDown(); - - return ret; - } - } - - /** */ - private static class Actor extends MessagingListenActor<Object> { - /** */ - private final ClusterGroup prj; - - /** - * @param prj Projection. - */ - private Actor(ClusterGroup prj) { - this.prj = prj; - } - - /** {@inheritDoc} */ - @Override protected void receive(UUID nodeId, Object msg) throws Throwable { - assertNotNull(ignite()); - - UUID locNodeId = ignite().cluster().localNode().id(); - - X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + locNodeId + ']'); - - assertEquals(prj.ignite().cluster().localNode().id(), nodeId); - assertEquals(MSG, msg); - - nodes.add(locNodeId); - cnt.incrementAndGet(); - latch.countDown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java deleted file mode 100644 index 0f6576b..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Tests for {@code GridDataLoaderImpl}. - */ -public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of keys to load via data loader. */ - private static final int KEYS_COUNT = 1000; - - /** Started grid counter. */ - private static int cnt; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - // Forth node goes without cache. - if (cnt < 4) - cfg.setCacheConfiguration(cacheConfiguration()); - - cnt++; - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testNullPointerExceptionUponDataLoaderClosing() throws Exception { - try { - startGrids(5); - - final CyclicBarrier barrier = new CyclicBarrier(2); - - multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - U.awaitQuiet(barrier); - - G.stopAll(true); - - return null; - } - }, 1); - - Ignite g4 = grid(4); - - IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null); - - dataLdr.perNodeBufferSize(32); - - for (int i = 0; i < 100000; i += 2) { - dataLdr.addData(i, i); - dataLdr.removeData(i + 1); - } - - U.awaitQuiet(barrier); - - info("Closing data loader."); - - try { - dataLdr.close(true); - } - catch (IllegalStateException ignore) { - // This is ok to ignore this exception as test is racy by it's nature - - // grid is stopping in different thread. - } - } - finally { - G.stopAll(true); - } - } - - /** - * Data loader should correctly load entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. - * - * @throws Exception If failed. - */ - public void testAddDataFromMap() throws Exception { - try { - cnt = 0; - - startGrids(2); - - Ignite g0 = grid(0); - - IgniteMarshaller marsh = g0.configuration().getMarshaller(); - - if (marsh instanceof IgniteOptimizedMarshaller) - assertTrue(((IgniteOptimizedMarshaller)marsh).isRequireSerializable()); - else - fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); - - IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null); - - Map<Integer, String> map = U.newHashMap(KEYS_COUNT); - - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, String.valueOf(i)); - - dataLdr.addData(map); - - dataLdr.close(); - - Random rnd = new Random(); - - GridCache<Integer, String> c = g0.cache(null); - - for (int i = 0; i < KEYS_COUNT; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - String v = c.get(k); - - assertEquals(k.toString(), v); - } - } - finally { - G.stopAll(true); - } - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(1); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - return cacheCfg; - } - - /** - * - */ - private static class TestObject implements Serializable { - /** */ - private int val; - - /** - */ - private TestObject() { - // No-op. - } - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - public Integer val() { - return val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof TestObject && ((TestObject)obj).val == val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java deleted file mode 100644 index 57a2e2e..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.concurrent.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Data loader performance test. Compares group lock data loader to traditional lock. - * <p> - * Disable assertions and give at least 2 GB heap to run this test. - */ -public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final int ENTRY_CNT = 80000; - - /** */ - private boolean useCache; - - /** */ - private boolean useGrpLock; - - /** */ - private String[] vals = new String[2048]; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - cfg.setRestEnabled(false); - - cfg.setPeerClassLoadingEnabled(true); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(PARTITIONED); - - cc.setDistributionMode(PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setStartSize(ENTRY_CNT / GRID_CNT); - cc.setSwapEnabled(false); - - cc.setBackups(1); - - cc.setStoreValueBytes(true); - - cfg.setCacheSanityCheckEnabled(false); - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - for (int i = 0; i < vals.length; i++) { - int valLen = ThreadLocalRandom8.current().nextInt(128, 512); - - StringBuilder sb = new StringBuilder(); - - for (int j = 0; j < valLen; j++) - sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); - - vals[i] = sb.toString(); - - info("Value: " + vals[i]); - } - } - - /** - * @throws Exception If failed. - */ - public void testPerformance() throws Exception { - useGrpLock = false; - - doTest(); - } - - /** - * @throws Exception If failed. - */ - public void testPerformanceGroupLock() throws Exception { - useGrpLock = true; - - doTest(); - } - - /** - * @throws Exception If failed. - */ - private void doTest() throws Exception { - System.gc(); - System.gc(); - System.gc(); - - try { - useCache = true; - - startGridsMultiThreaded(GRID_CNT); - - useCache = false; - - Ignite ignite = startGrid(); - - final IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(null); - - ldr.perNodeBufferSize(8192); - ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, String>groupLocked() : - GridDataLoadCacheUpdaters.<Integer, String>batchedSorted()); - ldr.autoFlushFrequency(0); - - final LongAdder cnt = new LongAdder(); - - long start = U.currentTimeMillis(); - - Thread t = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - while (true) { - try { - Thread.sleep(10000); - } - catch (InterruptedException ignored) { - break; - } - - info(">>> Adds/sec: " + cnt.sumThenReset() / 10); - } - } - }); - - t.setDaemon(true); - - t.start(); - - int threadNum = 2;//Runtime.getRuntime().availableProcessors(); - - multithreaded(new Callable<Object>() { - @SuppressWarnings("InfiniteLoopStatement") - @Override public Object call() throws Exception { - ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); - - while (true) { - int i = rnd.nextInt(ENTRY_CNT); - - ldr.addData(i, vals[rnd.nextInt(vals.length)]); - - cnt.increment(); - } - } - }, threadNum, "loader"); - - info("Closing loader..."); - - ldr.close(false); - - long duration = U.currentTimeMillis() - start; - - info("Finished performance test. Duration: " + duration + "ms."); - } - finally { - stopAllGrids(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java deleted file mode 100644 index 19b8ee3..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ /dev/null @@ -1,883 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.fifo.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * - */ -public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { - /** */ - private static ConcurrentHashMap<Object, Object> storeMap; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private GridCacheMode mode = PARTITIONED; - - /** */ - private boolean nearEnabled = true; - - /** */ - private boolean useCache; - - /** */ - private boolean useGrpLock; - - /** */ - private TestStore store; - - /** {@inheritDoc} */ - @Override public void afterTest() throws Exception { - super.afterTest(); - - useCache = false; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(mode); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - - cc.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000)); - - cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); - - if (store != null) { - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - } - - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPartitioned() throws Exception { - mode = PARTITIONED; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testColocated() throws Exception { - mode = PARTITIONED; - nearEnabled = false; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedGroupLock() throws Exception { - mode = PARTITIONED; - useGrpLock = true; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - mode = REPLICATED; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedGroupLock() throws Exception { - mode = REPLICATED; - useGrpLock = true; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testLocal() throws Exception { - mode = LOCAL; - - try { - checkDataLoader(); - - assert false; - } - catch (IgniteCheckedException e) { - // Cannot load local cache configured remotely. - info("Caught expected exception: " + e); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("ErrorNotRethrown") - private void checkDataLoader() throws Exception { - try { - Ignite g1 = startGrid(1); - - useCache = true; - - Ignite g2 = startGrid(2); - Ignite g3 = startGrid(3); - - final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null); - - ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : - GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); - - final AtomicInteger idxGen = new AtomicInteger(); - final int cnt = 400; - final int threads = 10; - - final CountDownLatch l1 = new CountDownLatch(threads); - - IgniteFuture<?> f1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - futs.add(ldr.addData(idx, idx)); - } - - l1.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l1.await(); - - // This will wait until data loader finishes loading. - stopGrid(getTestGridName(1), false); - - f1.get(); - - int s2 = g2.cache(null).primaryKeySet().size(); - int s3 = g3.cache(null).primaryKeySet().size(); - int total = threads * cnt; - - assertEquals(total, s2 + s3); - - final IgniteDataLoader<Integer, Integer> rmvLdr = g2.dataLoader(null); - - rmvLdr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : - GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); - - final CountDownLatch l2 = new CountDownLatch(threads); - - IgniteFuture<?> f2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - final int key = idxGen.decrementAndGet(); - - futs.add(rmvLdr.removeData(key)); - } - - l2.countDown(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, threads); - - l2.await(); - - rmvLdr.close(false); - - f2.get(); - - s2 = g2.cache(null).primaryKeySet().size(); - s3 = g3.cache(null).primaryKeySet().size(); - - assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; - } - finally { - stopAllGrids(); - } - } - - /** - * Test primitive arrays can be passed into data loader. - * - * @throws Exception If failed. - */ - public void testPrimitiveArrays() throws Exception { - try { - useCache = true; - mode = PARTITIONED; - - Ignite g1 = startGrid(1); - startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). - - List<Object> arrays = Arrays.<Object>asList( - new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, - new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); - - IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null); - - for (int i = 0, size = arrays.size(); i < 1000; i++) { - Object arr = arrays.get(i % size); - - dataLdr.addData(i, arr); - dataLdr.addData(i, fixedClosure(arr)); - } - - dataLdr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedMultiThreaded() throws Exception { - mode = REPLICATED; - - checkLoaderMultithreaded(1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedMultiThreadedGroupLock() throws Exception { - mode = REPLICATED; - useGrpLock = true; - - checkLoaderMultithreaded(1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedMultiThreaded() throws Exception { - mode = PARTITIONED; - - checkLoaderMultithreaded(1, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedMultiThreadedGroupLock() throws Exception { - mode = PARTITIONED; - useGrpLock = true; - - checkLoaderMultithreaded(1, 3); - } - - /** - * Tests loader in multithreaded environment with various count of grids started. - * - * @param nodesCntNoCache How many nodes should be started without cache. - * @param nodesCntCache How many nodes should be started with cache. - * @throws Exception If failed. - */ - protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) - throws Exception { - try { - // Start all required nodes. - int idx = 1; - - for (int i = 0; i < nodesCntNoCache; i++) - startGrid(idx++); - - useCache = true; - - for (int i = 0; i < nodesCntCache; i++) - startGrid(idx++); - - Ignite g1 = grid(1); - - // Get and configure loader. - final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null); - - ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : - GridDataLoadCacheUpdaters.<Integer, Integer>individual()); - ldr.perNodeBufferSize(2); - - // Define count of puts. - final AtomicInteger idxGen = new AtomicInteger(); - - final AtomicBoolean done = new AtomicBoolean(); - - try { - final int totalPutCnt = 50000; - - IgniteFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Collection<IgniteFuture<?>> futs = new ArrayList<>(); - - while (!done.get()) { - int idx = idxGen.getAndIncrement(); - - if (idx >= totalPutCnt) { - info(">>> Stopping producer thread since maximum count of puts reached."); - - break; - } - - futs.add(ldr.addData(idx, idx)); - } - - ldr.flush(); - - for (IgniteFuture<?> fut : futs) - fut.get(); - - return null; - } - }, 5, "producer"); - - IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!done.get()) { - ldr.flush(); - - U.sleep(100); - } - - return null; - } - }, 1, "flusher"); - - // Define index of node being restarted. - final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; - - IgniteFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - for (int i = 0; i < 5; i++) { - Ignite g = startGrid(restartNodeIdx); - - UUID id = g.cluster().localNode().id(); - - info(">>>>>>> Started node: " + id); - - U.sleep(1000); - - stopGrid(getTestGridName(restartNodeIdx), true); - - info(">>>>>>> Stopped node: " + id); - } - } - finally { - done.set(true); - - info("Start stop thread finished."); - } - - return null; - } - }, 1, "start-stop-thread"); - - fut1.get(); - fut2.get(); - fut3.get(); - } - finally { - ldr.close(false); - } - - info("Cache size on second grid: " + grid(nodesCntNoCache + 1).cache(null).primaryKeySet().size()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testLoaderApi() throws Exception { - useCache = true; - - try { - Ignite g1 = startGrid(1); - - IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null); - - ldr.close(false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - - try { - // Create another loader. - ldr = g1.dataLoader("UNKNOWN_CACHE"); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - ldr.close(true); - - assert ldr.future().isDone(); - - ldr.future().get(); - - // Create another loader. - ldr = g1.dataLoader(null); - - // Cancel with future. - ldr.future().cancel(); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - try { - ldr.future().get(); - - assert false; - } - catch (IgniteFutureCancelledException e) { - info("Caught expected exception: " + e); - } - - // Create another loader. - ldr = g1.dataLoader(null); - - // This will close loader. - stopGrid(getTestGridName(1), false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - } - finally { - stopAllGrids(); - } - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Callable. - */ - private static Callable<Integer> callable(@Nullable final Integer i) { - return new Callable<Integer>() { - @Override public Integer call() throws Exception { - return i; - } - }; - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Closure. - */ - private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { - return new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer e) { - return e == null ? i : e + i; - } - }; - } - - /** - * Wraps object to closure returning it. - * - * @param obj Value to wrap. - * @return Closure. - */ - private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { - return new IgniteClosure<T, T>() { - @Override public T apply(T e) { - assert e == null || obj == null || e.getClass() == obj.getClass() : - "Expects the same types [e=" + e + ", obj=" + obj + ']'; - - return obj; - } - }; - } - - /** - * Wraps integer to closure expecting it and returning {@code null}. - * - * @param exp Expected closure value. - * @return Remove expected cache value closure. - */ - private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { - return new IgniteClosure<T, T>() { - @Override public T apply(T act) { - if (exp == null ? act == null : exp.equals(act)) - return null; - - throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); - } - }; - } - - /** - * @throws Exception If failed. - */ - public void testFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final GridCache<Integer, Integer> c = g.cache(null); - - final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.isEmpty()); - - multithreaded(new Callable<Void>() { - @Override - public Void call() throws Exception { - ldr.flush(); - - assertEquals(9, c.size()); - - return null; - } - }, 5, "flush-checker"); - - ldr.addData(100, 100); - - ldr.flush(); - - assertEquals(10, c.size()); - - ldr.addData(200, 200); - - ldr.close(false); - - ldr.future().get(); - - assertEquals(11, c.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTryFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - GridCache<Integer, Integer> c = g.cache(null); - - IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.isEmpty()); - - ldr.tryFlush(); - - Thread.sleep(100); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFlushTimeout() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final CountDownLatch latch = new CountDownLatch(9); - - g.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_OBJECT_PUT); - - GridCache<Integer, Integer> c = g.cache(null); - - assertTrue(c.isEmpty()); - - IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - ldr.autoFlushFrequency(3000); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.isEmpty()); - - assertFalse(latch.await(1000, MILLISECONDS)); - - assertTrue(c.isEmpty()); - - assertTrue(latch.await(3000, MILLISECONDS)); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testUpdateStore() throws Exception { - storeMap = new ConcurrentHashMap<>(); - - try { - store = new TestStore(); - - useCache = true; - - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - for (int i = 0; i < 1000; i++) - storeMap.put(i, i); - - try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { - assertFalse(ldr.skipStore()); - - for (int i = 0; i < 1000; i++) - ldr.removeData(i); - - for (int i = 1000; i < 2000; i++) - ldr.addData(i, i); - } - - for (int i = 0; i < 1000; i++) - assertNull(storeMap.get(i)); - - for (int i = 1000; i < 2000; i++) - assertEquals(i, storeMap.get(i)); - - try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { - ldr.skipStore(true); - - for (int i = 0; i < 1000; i++) - ldr.addData(i, i); - - for (int i = 1000; i < 2000; i++) - ldr.removeData(i); - } - - IgniteCache<Object, Object> cache = ignite.jcache(null); - - for (int i = 0; i < 1000; i++) { - assertNull(storeMap.get(i)); - - assertEquals(i, cache.get(i)); - } - - for (int i = 1000; i < 2000; i++) { - assertEquals(i, storeMap.get(i)); - - assertNull(cache.localPeek(i)); - } - } - finally { - storeMap = null; - } - } - - /** - * - */ - private static class TestObject { - /** Value. */ - private final int val; - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestObject obj = (TestObject)o; - - return val == obj.val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - } - - /** - * - */ - private class TestStore extends CacheStoreAdapter<Object, Object> { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return storeMap.get(key); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) { - storeMap.put(entry.getKey(), entry.getValue()); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java index 92cda61..076f9fc 100644 --- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java +++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java @@ -20,9 +20,9 @@ package org.gridgain.testsuites.bamboo; import junit.framework.*; import org.gridgain.grid.*; import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.processors.affinity.*; -import org.gridgain.grid.kernal.processors.closure.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.closure.*; +import org.apache.ignite.internal.processors.continuous.*; import org.gridgain.grid.product.*; import org.gridgain.grid.spi.*; import org.apache.ignite.internal.util.typedef.internal.*;
