Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 fbe025f68 -> 2be7cf033
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java deleted file mode 100644 index 9c34085..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.index.*; -import org.apache.ignite.streamer.index.hash.*; -import org.apache.ignite.streamer.index.tree.*; -import org.apache.ignite.streamer.window.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.loadtests.util.GridLoadTestArgs.*; -import static org.apache.ignite.testframework.GridTestUtils.*; - -/** - * Load test for streamer index. - */ -public class GridStreamerIndexLoadTest { - /** - * Window index configuration. - */ - private enum IndexConfiguration { - /** - * Tree index with non-unique elements and no event tracking. - */ - TREE_INDEX_NOT_UNIQUE { - /** {@inheritDoc} */ - @Override - StreamerIndexProvider<Integer, Integer, Long> indexProvider() { - StreamerTreeIndexProvider<Integer, Integer, Long> idx = new StreamerTreeIndexProvider<>(); - - idx.setUpdater(new IndexUpdater()); - idx.setUnique(false); - idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF); - - return idx; - } - }, - - /** - * Hash index with non-unique elements and no event tracking. - */ - HASH_INDEX_NOT_UNIQUE { - /** {@inheritDoc} */ - @Override - StreamerIndexProvider<Integer, Integer, Long> indexProvider() { - StreamerHashIndexProvider<Integer, Integer, Long> idx = new StreamerHashIndexProvider<>(); - - idx.setUpdater(new IndexUpdater()); - idx.setUnique(false); - idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF); - - return idx; - } - }; - - /** - * @return Index provider for this index configuration. - */ - abstract StreamerIndexProvider<Integer, Integer, Long> indexProvider(); - } - - /** - * @param args Command line arguments. - * @throws Exception If error occurs. - */ - public static void main(String[] args) throws Exception { - for (IndexConfiguration idxCfg : EnumSet.allOf(IndexConfiguration.class)) { - X.println(">>> Running benchmark for configuration: " + idxCfg); - - runBenchmark(idxCfg); - } - } - - /** - * Runs the benchmark for the specified index configuration. - * - * @param idxCfg Index configuration. - * @throws Exception If error occurs. - */ - public static void runBenchmark(IndexConfiguration idxCfg) throws Exception { - int thrCnt = getIntProperty(THREADS_CNT, 1); - int dur = getIntProperty(TEST_DUR_SEC, 60); - int winSize = getIntProperty("IGNITE_WIN_SIZE", 5000); - - dumpProperties(System.out); - - final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(winSize); - win.setIndexes(idxCfg.indexProvider()); - - win.start(); - - final AtomicLong enqueueCntr = new AtomicLong(); - - IgniteInternalFuture<Long> enqueueFut = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - Random rnd = new Random(); - - while (!Thread.currentThread().isInterrupted()) { - win.enqueue(rnd.nextInt()); - - enqueueCntr.incrementAndGet(); - } - } - }, thrCnt, "generator"); - - final AtomicLong evictCntr = new AtomicLong(); - - IgniteInternalFuture<Long> evictFut = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - while (!Thread.currentThread().isInterrupted()) { - win.pollEvicted(); - - evictCntr.incrementAndGet(); - } - } - }, thrCnt, "evictor"); - - IgniteInternalFuture<Long> collFut = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - int nSec = 0; - long prevEnqueue = enqueueCntr.get(); - long prevEvict = evictCntr.get(); - - try { - while (!Thread.currentThread().isInterrupted()) { - U.sleep(1000); - nSec++; - - long curEnqueue = enqueueCntr.get(); - long curEvict = evictCntr.get(); - - X.println("Stats [enqueuePerSec=" + (curEnqueue - prevEnqueue) + - ", evictPerSec=" + (curEvict - prevEvict) + ']'); - - prevEnqueue = curEnqueue; - prevEvict = curEvict; - } - } - catch (IgniteInterruptedCheckedException ignored) { - // No-op. - } - - X.println("Final results [enqueuePerSec=" + (enqueueCntr.get() / nSec) + - ", evictPerSec=" + (evictCntr.get() / nSec) + ']'); - } - }, 1, "collector"); - - U.sleep(dur * 1000); - - X.println("Finishing test."); - - collFut.cancel(); - enqueueFut.cancel(); - evictFut.cancel(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java deleted file mode 100644 index 583ed56..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -import java.util.*; - -/** - * Configurable streamer load. - */ -public class GridStreamerLoad { - /** Steamer name. */ - private String name; - - /** Load closures. */ - private List<IgniteInClosure<IgniteStreamer>> clos; - - /** - * @return Steamer name. - */ - public String getName() { - return name; - } - - /** - * @param name Steamer name. - */ - public void setName(String name) { - this.name = name; - } - - /** - * @return Query closure. - */ - public List<IgniteInClosure<IgniteStreamer>> getClosures() { - return clos; - } - - /** - * @param clos Query closure. - */ - public void setClosures(List<IgniteInClosure<IgniteStreamer>> clos) { - this.clos = clos; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java deleted file mode 100644 index 95d76f5..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -/** - * Streamer benchmark window index updater. - */ -class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> { - /** {@inheritDoc} */ - @Override public Integer indexKey(Integer evt) { - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() + 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) { - return entry.value() - 1 == 0 ? null : entry.value() - 1; - } - - /** {@inheritDoc} */ - @Override public Long initialValue(Integer evt, Integer key) { - return 1L; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java deleted file mode 100644 index a8f0d70..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -/** - * Closure for events generation. - */ -class QueryClosure implements IgniteInClosure<IgniteStreamer> { - /** Sleep period (seconds). */ - private static final int SLEEP_PERIOD_SEC = 3; - - /** Random range. */ - private int rndRange = 100; - - /** Warmup time. */ - private long warmup = 60000; - - /** {@inheritDoc} */ - @Override public void apply(IgniteStreamer streamer) { - X.println("Pefromrming warmup: " + warmup + "ms..."); - - try { - U.sleep(warmup); - } - catch (IgniteInterruptedCheckedException ignore) { - return; - } - - long initTime = System.currentTimeMillis(); - long initExecs = streamer.metrics().stageTotalExecutionCount(); - - long prevExecs = initExecs; - - while (!Thread.interrupted()) { - try { - U.sleep(SLEEP_PERIOD_SEC * 1000); - } - catch (IgniteInterruptedCheckedException ignore) { - return; - } - - long curTime = System.currentTimeMillis(); - long curExecs = streamer.metrics().stageTotalExecutionCount(); - - long deltaExecs = curExecs - prevExecs; - long deltaThroughput = deltaExecs/SLEEP_PERIOD_SEC; - - long totalTimeSec = (curTime - initTime) / 1000; - long totalExecs = curExecs - initExecs; - long totalThroughput = totalExecs/totalTimeSec; - - X.println("Measurement: [throughput=" + deltaThroughput + " execs/sec, totalThroughput=" + - totalThroughput + " execs/sec]"); - - prevExecs = curExecs; - } - } - - /** - * @return Random range. - */ - public int getRandomRange() { - return rndRange; - } - - /** - * @param rndRange Random range. - */ - public void setRandomRange(int rndRange) { - this.rndRange = rndRange; - } - - /** - * @return Warmup time (milliseconds) - */ - public long getWarmup() { - return warmup; - } - - /** - * @param warmup Warmup time (milliseconds) - */ - public void setWarmup(long warmup) { - this.warmup = warmup; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java deleted file mode 100644 index fdf2aa7..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer.average; - -/** - * Average helper class. - */ -class TestAverage { - /** */ - private int total; - - /** */ - private int cnt; - - /** - * @param avg Average. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public void increment(TestAverage avg) { - int total; - int cnt; - - synchronized (avg) { - total = avg.total; - cnt = avg.cnt; - } - - increment(total, cnt); - } - - /** - * @param total Increment total. - * @param cnt Increment count. - */ - public synchronized void increment(int total, int cnt) { - this.total += total; - this.cnt += cnt; - } - - /** - * @param total Total. - * @param cnt Count. - */ - public synchronized void set(int total, int cnt) { - this.total = total; - this.cnt = cnt; - } - - /** - * @return Running average. - */ - public synchronized double average() { - return (double)total / cnt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java deleted file mode 100644 index 23c30fc..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer.average; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.streamer.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Stage for average benchmark. - */ -class TestStage implements StreamerStage<Integer> { - /** {@inheritDoc} */ - @Override public String name() { - return "stage"; - } - - /** {@inheritDoc} */ - @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts) { - ConcurrentMap<String, TestAverage> loc = ctx.localSpace(); - - TestAverage avg = loc.get("avg"); - - if (avg == null) - avg = F.addIfAbsent(loc, "avg", new TestAverage()); - - for (Integer e : evts) - avg.increment(e, 1); - - StreamerWindow<Integer> win = ctx.window(); - - win.enqueueAll(evts); - - while (true) { - Integer e = win.pollEvicted(); - - if (e == null) - break; - - // Subtract evicted events from running total. - avg.increment(-e, -1); - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java index e4326bf..3eb2909 100644 --- a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java @@ -27,13 +27,10 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.executor.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.service.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.p2p.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.window.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; @@ -102,7 +99,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest namedCache.setAtomicityMode(TRANSACTIONAL); cfg.setMarshaller(marshaller()); - cfg.setStreamerConfiguration(streamerConfiguration()); cfg.setCacheConfiguration(new CacheConfiguration(), namedCache); return cfg; @@ -113,32 +109,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest */ protected abstract Marshaller marshaller(); - /** - * @return Streamer configuration. - */ - private static StreamerConfiguration streamerConfiguration() { - Collection<StreamerStage> stages = F.<StreamerStage>asList(new StreamerStage() { - @Override - public String name() { - return "name"; - } - - @Nullable - @Override - public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) { - return null; - } - }); - - StreamerConfiguration cfg = new StreamerConfiguration(); - - cfg.setAtLeastOnce(true); - cfg.setWindows(F.asList((StreamerWindow) new StreamerUnboundedWindow())); - cfg.setStages(stages); - - return cfg; - } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { marsh = grid().configuration().getMarshaller(); @@ -797,32 +767,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest } /** - * @throws Exception If failed. - */ - public void testStreamer() throws Exception { - IgniteStreamer streamer = grid().streamer(null); - - streamer.addEvent("test"); - - GridMarshallerTestBean inBean = newTestBean(streamer); - - byte[] buf = marshal(inBean); - - GridMarshallerTestBean outBean = unmarshal(buf); - - assert inBean.getObjectField() != null; - assert outBean.getObjectField() != null; - - assert inBean.getObjectField().getClass().equals(IgniteStreamerImpl.class); - assert outBean.getObjectField().getClass().equals(IgniteStreamerImpl.class); - - assert inBean != outBean; - assert inBean.equals(outBean); - - outBean.checkNullResources(); - } - - /** * @param obj Object field to use. * @return New test bean. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java deleted file mode 100644 index 869dd94..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java +++ /dev/null @@ -1,686 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.index; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.index.hash.*; -import org.apache.ignite.streamer.index.tree.*; -import org.apache.ignite.streamer.window.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*; -import static org.apache.ignite.testframework.GridTestUtils.*; - -/** - * Tests for Streamer window index. - */ -public class GridStreamerIndexSelfTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testTreeIndex() throws Exception { - for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) { - checkUniqueIndex(indexProvider(true, "idx", new UniqueStringIndexUpdater(), plc, true)); - - checkNonUniqueIndex(indexProvider(true, "idx", new IndexUpdater(), plc, false)); - } - } - - /** - * @throws Exception If failed. - */ - public void testHashIndex() throws Exception { - for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) { - checkUniqueIndex(indexProvider(false, "idx", new UniqueStringIndexUpdater(), plc, true)); - - checkNonUniqueIndex(indexProvider(false, "idx", new IndexUpdater(), plc, false)); - } - } - - /** - * @throws Exception If failed. - */ - public void testMultipleIndexUpdate() throws Exception { - StreamerIndexProvider<String, String, Integer> idxProvider = - indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON, false); - - StreamerIndexProvider<String, String, String> idxProvider1 = - indexProvider(true, "idx1", new UniqueStringIndexUpdater(), EVENT_TRACKING_ON, true); - - StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(5); - win.setIndexes(idxProvider, idxProvider1); - - win.start(); - - win.enqueue("A"); - win.enqueue("B"); - win.enqueue("C"); - win.enqueue("D"); - - // Snapshot both indexes. - StreamerIndex<String, String, Integer> idx = win.index("idx"); - StreamerIndex<String, String, String> idx1 = win.index("idx1"); - - info("Idx: " + idx.entries(0)); - info("Idx1: " + idx1.entries(0)); - - try { - win.enqueue("A"); - - fail("Exception should have been thrown."); - } - catch (IgniteException e) { - info("Caught expected exception: " + e); - } - - StreamerIndex<String, String, Integer> idxAfter = win.index("idx"); - StreamerIndex<String, String, String> idx1After = win.index("idx1"); - - info("Idx (after): " + idxAfter.entries(0)); - info("Idx1 (after): " + idx1After.entries(0)); - - assertEquals(4, idx.entries(0).size()); - assertEquals(4, idx1.entries(0).size()); - - assertTrue(F.eqOrdered(idx.entries(0), idxAfter.entries(0))); - assertTrue(F.eqOrdered(idx1.entries(0), idx1After.entries(0))); - - idxProvider.reset(); - - assertEquals(4, idx.entries(0).size()); - } - - /** - * @throws Exception If failed. - */ - public void testSortedIndexMultithreaded() throws Exception { - checkSortedIndexMultithreaded(32, 500, false); - } - - /** - * @throws Exception If failed. - */ - public void testSortedIndexMultithreadedWithConcurrentPollEvicted() throws Exception { - checkSortedIndexMultithreaded(32, 500, true); - } - - /** - * @throws Exception If failed. - */ - public void testUniqueHashIndexMultithreaded() throws Exception { - checkUniqueHashIndexMultithreaded(32, 500); - } - - /** - * @throws Exception If failed. - */ - public void testUpdaterIndexKeyNull() throws Exception { - checkIndexUpdater(new IndexUpdater() { - @Nullable @Override public String indexKey(String evt) { - return "A".equals(evt) ? null : evt; - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testUpdaterInitialValueNull() throws Exception { - checkIndexUpdater(new IndexUpdater() { - @Nullable @Override public Integer initialValue(String evt, String key) { - return "A".equals(evt) ? null : 1; - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testUpdaterOnAddedNull() throws Exception { - checkIndexUpdater(new IndexUpdater() { - @Nullable @Override - public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) { - return "A".equals(evt) ? null : entry.value() + 1; - } - }); - } - - /** - * Checks the correct behaviour of {@link StreamerIndexUpdater}, given that - * it discards event "A" and accepts event "B". - * - * @param updater Index updater. - */ - private void checkIndexUpdater(StreamerIndexUpdater<String, String, Integer> updater) { - List<StreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList( - indexProvider(true, "tree", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false), - indexProvider(false, "hash", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false)); - - for (StreamerIndexProvider<String, String, Integer> idxp : idxps) { - StreamerUnboundedWindow<String> win = new StreamerUnboundedWindow<>(); - - win.setIndexes(idxp); - - win.start(); - - win.enqueue("A"); - win.enqueue("A"); - win.enqueue("B"); - - StreamerIndex<String, Object, Object> idx = win.index(idxp.getName()); - - assertNotNull(idx); - - assertNull(idx.entry("A")); - - assertNotNull(idx.entry("B")); - } - } - - /** - * @param treeIdx {@code True} to create tree index. - * @param name Name. - * @param updater Updater. - * @param plc Policy. - * @param unique Unique. - * @return Index provider. - */ - private <E, K, V> StreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name, - StreamerIndexUpdater<E, K, V> updater, StreamerIndexPolicy plc, boolean unique) { - if (treeIdx) { - StreamerTreeIndexProvider<E, K, V> idx = new StreamerTreeIndexProvider<>(); - - idx.setName(name); - idx.setUpdater(updater); - idx.setUnique(unique); - idx.setPolicy(plc); - - return idx; - } - else { - StreamerHashIndexProvider<E, K, V> idx = new StreamerHashIndexProvider<>(); - - idx.setName(name); - idx.setUpdater(updater); - idx.setUnique(unique); - idx.setPolicy(plc); - - return idx; - } - } - - /** - * @param threadCnt Thread count. - * @param iters Number of iterations for each worker thread. - * @throws Exception If failed. - */ - private void checkUniqueHashIndexMultithreaded(int threadCnt, final int iters) - throws Exception { - StreamerIndexProvider<String, String, Integer> idxProvider = - indexProvider(false, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, true); - - for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) { - final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(threadCnt * 2); - win.setIndexes(idxProvider); - - win.start(); - - final String evt = "evt" + i; - final AtomicInteger nIdxErrors = new AtomicInteger(); - - // Submit the same event in multiple threads. - runMultiThreaded(new CAX() { - @Override public void applyx() { - try { - win.enqueue(evt); - } - catch (IgniteException e) { - if (e.getMessage().contains("Index unique key violation")) - nIdxErrors.incrementAndGet(); - else - throw e; - } - } - }, threadCnt, "put"); - - // Only one thread should succeed, because the index is unique. - assertEquals(threadCnt - 1, nIdxErrors.get()); - - StreamerIndex<String, String, Integer> idx = win.index("idx"); - - // Only one event should be present and have value 1. - assertEquals(1, idx.entries(0).size()); - assertEquals((Integer)1, idx.entry(evt).value()); - } - } - - /** - * @param threadCnt Thread count. - * @param iters Number of iterations for each worker thread. - * @param pollEvicted Poll evicted events concurrently, if true. - * @throws Exception If failed. - */ - public void checkSortedIndexMultithreaded(final int threadCnt, final int iters, final boolean pollEvicted) - throws Exception { - final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(threadCnt * 2); - win.setIndexes(indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, false)); - - win.start(); - - IgniteInternalFuture<Long> pollFut = null; - - if (pollEvicted) { - // These threads poll evicted events from the window if it doesn't break - // the test invariant. - pollFut = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - try { - while (!Thread.currentThread().isInterrupted()) { - StreamerIndex<String, String, Integer> idx = win.index("idx"); - - boolean canPoll = F.forAll( - idx.entries(-1 * threadCnt), - new P1<StreamerIndexEntry<String, String, Integer>>() { - @Override public boolean apply(StreamerIndexEntry<String, String, Integer> e) { - return e.value() > 2; - } - }); - - if (!canPoll || win.pollEvicted() == null) - U.sleep(50); - } - } - catch (IgniteInterruptedCheckedException ignored) { - // No-op. - } - } - }, threadCnt / 4, "test-poll"); - } - - try { - // Each of these threads generates a single event repeatedly and checks - // if it is still present in the window. In the tested index events are - // sorted by value and the value is a number of repeated events, so, this - // should be invariant. - IgniteInternalFuture<Long> fut1 = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - final String evt = Thread.currentThread().getName(); - int cntr = 1; - - for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) { - win.enqueue(evt); - - StreamerIndex<String, String, Integer> idx = win.index("idx"); - StreamerIndexEntry<String, String, Integer> entry = idx.entry(evt); - - assertNotNull(entry); - - // If concurrent eviction is disabled, check if the - // value grows each time we enqueue a new event. - if (!pollEvicted) - assertEquals((Integer)cntr++, entry.value()); - - // If queued event more than once, the first threadCnt entries - // in descending order should contain an entry with this thread's event. - if (i > 0) - assert idx.entries(-1 * threadCnt).contains(entry); - } - } - }, threadCnt / 2, "test-multi"); - - // This thread generates a set of single non-repeating events from 0 to iters. - IgniteInternalFuture<Long> fut2 = runMultiThreadedAsync(new CAX() { - @Override public void applyx() { - for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) - win.enqueue(String.valueOf(i)); - } - }, 1, "test-single"); - - fut2.get(getTestTimeout()); - fut1.get(getTestTimeout()); - } - finally { - if (pollFut != null) - pollFut.cancel(); - } - } - - /** - * @param idx Index. - */ - private void checkNonUniqueIndex(StreamerIndexProvider<String, String, Integer> idx) { - assert !idx.isUnique(); - - StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(5); - win.setIndexes(idx); - - win.start(); - - for (int i = 0; i < 20; ) { - win.enqueue("A" + i); i++; - win.enqueue("B"); i++; - win.enqueue("C"); i++; - win.enqueue("D"); i++; - } - - StreamerIndex<String, String, Integer> idx0 = win.index("idx"); - - String s; - - while ((s = win.pollEvicted()) != null) - info("Evicted String: " + s); - - StreamerIndex<String, String, Integer> idx1 = win.index("idx"); - - if (idx instanceof StreamerTreeIndexProvider) { // Tree index. - assert idx0.sorted(); - - // Users with unique names. - for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) { - info("Entry [e=" + e + ", evts=" + e.events() + ']'); - - if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) { - assertEquals(1, e.events().size()); - assertEquals('A', F.first(e.events()).charAt(0)); - } - } - - assertTrue(idx0.entrySet(2).isEmpty()); - - for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) { - info("Entry [e=" + e + ", evts=" + e.events() + ']'); - - if (idx.getPolicy() == EVENT_TRACKING_ON) - assertEquals(5, e.events().size()); - - else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) - assertEquals(1, e.events().size()); - - else - assertNull(e.events()); - } - - assertEquals(5, idx0.entrySet(1).size()); - - List<StreamerIndexEntry<String, String, Integer>> asc = - new ArrayList<>(idx0.entrySet(true, null, true, null, true)); - List<StreamerIndexEntry<String, String, Integer>> desc = - new ArrayList<>(idx0.entrySet(false, null, true, null, true)); - - assertEquals(8, asc.size()); - assertEquals(8, desc.size()); - - for (int i = 0; i < asc.size(); i++) - assertEquals(asc.get(i), desc.get(desc.size() - i - 1)); - - try { - idx0.entrySet(true, 10, true, -10, true); - - assert false; - } - catch (IllegalArgumentException e) { - info("Caught expected exception: " + e); - } - - try { - idx0.entrySet(false, -10, true, 10, true); - - assert false; - } - catch (IllegalArgumentException e) { - info("Caught expected exception: " + e); - } - } - else - assert !idx0.sorted(); - - assertEquals(4, idx1.size()); - - for (StreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) { - Collection<String> evts = e.events(); - - info("Entry [e=" + e + ", evts=" + evts + ']'); - - if (idx.getPolicy() == EVENT_TRACKING_ON) { - assert evts != null; - - switch (evts.size()) { - case 1: - assert F.containsAny(evts, "A16", "B", "C") : "Wrong tracked event: " + F.first(evts); - - break; - - case 2: - Collection<String> dedup = F.dedup(evts); - - assert dedup.size() == 1 && "D".equals(F.first(dedup)) : "Wrong tracked events: " + evts; - - break; - - default: - fail("Wrong tracked events: " + evts); - } - } - else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) - assert evts != null && evts.size() == 1 && F.containsAny(evts, "A16", "B", "C", "D") : - "Wrong tracked events: " + evts; - else if (idx.getPolicy() == EVENT_TRACKING_OFF) - assert evts == null; - } - - // Check that idx0 is unaffected. - assertEquals(8, idx0.size()); - - idx.reset(); - - assertEquals(0, idx.index().size()); - assertEquals(8, idx0.size()); - } - - /** - * @param idx Index. - */ - private void checkUniqueIndex(StreamerIndexProvider<String, String, String> idx) { - assert idx.isUnique(); - - StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(5); - win.setIndexes(idx); - - win.start(); - - for (int i = 0; i < 20; i++) - win.enqueue("A" + i); - - for (int i = 0; i < 20; i++) { - try { - win.enqueue("A" + i); - - fail("Exception should have been thrown."); - } - catch (IgniteException e) { - info("Caught expected exception: " + e); - } - } - - StreamerIndex<String, String, String> idx0 = win.index("idx"); - - String s; - - while ((s = win.pollEvicted()) != null) - info("Evicted string: " + s); - - StreamerIndex<String, String, String> idx1 = win.index("idx"); - - if (idx instanceof StreamerTreeIndexProvider) { // Tree index. - assert idx0.sorted(); - - // Users with unique names. - for (StreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) { - info("Entry [e=" + e + ", evts=" + e.events() + ']'); - - if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) { - assertEquals(1, e.events().size()); - assertEquals('A', F.first(e.events()).charAt(0)); - } - } - - assertTrue(idx0.entrySet("B").isEmpty()); - - assertEquals(1, idx0.entrySet("A0").size()); - - List<StreamerIndexEntry<String, String, String>> asc = - new ArrayList<>(idx0.entrySet(true, null, true, null, true)); - List<StreamerIndexEntry<String, String, String>> desc = - new ArrayList<>(idx0.entrySet(false, null, true, null, true)); - - assertEquals(20, asc.size()); - assertEquals(20, desc.size()); - - for (int i = 0; i < asc.size(); i++) - assertEquals(asc.get(i), desc.get(desc.size() - i - 1)); - } - else - assert !idx0.sorted(); - - assertEquals(5, idx1.size()); - - for (StreamerIndexEntry<String, String, String> e : idx1.entries(0)) { - Collection<String> evts = e.events(); - - info("Entry [e=" + e + ", evts=" + evts + ']'); - - if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) { - assert evts != null && evts.size() == 1 : "Wrong tracked events: " + evts; - - int i = Integer.parseInt(F.first(evts).substring(1)); - - assert i >= 15 && i < 20 : "Wrong event: " + F.first(evts); - } - else if (idx.getPolicy() == EVENT_TRACKING_OFF) - assert evts == null; - } - - // Check that idx0 is unaffected. - assertEquals(20, idx0.size()); - - idx.reset(); - - assertEquals(0, idx.index().size()); - assertEquals(20, idx0.size()); - } - - /** - * Name index updater. - */ - private static class IndexUpdater implements StreamerIndexUpdater<String, String, Integer> { - /** {@inheritDoc} */ - @Nullable @Override public String indexKey(String evt) { - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer initialValue(String evt, String key) { - return 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) { - return entry.value() + 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry, - String evt) { - int res = entry.value() - 1; - - return res == 0 ? null : res; - } - } - - /** - * Name index updater. - */ - private static class HashIndexUpdater implements StreamerIndexUpdater<String, String, Integer> { - /** {@inheritDoc} */ - @Nullable @Override public String indexKey(String evt) { - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer initialValue(String evt, String key) { - return 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) { - return entry.value() + 1; - } - - /** {@inheritDoc} */ - @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry, - String evt) { - int res = entry.value() - 1; - - return res == 0 ? null : res; - } - } - - /** - * Name index updater. - */ - private static class UniqueStringIndexUpdater implements StreamerIndexUpdater<String, String, String> { - /** {@inheritDoc} */ - @Nullable @Override public String indexKey(String evt) { - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public String initialValue(String evt, String key) { - return evt; - } - - /** {@inheritDoc} */ - @Nullable @Override public String onAdded(StreamerIndexEntry<String, String, String> entry, String evt) { - throw new IgniteException("Unique key violation: " + evt); - } - - /** {@inheritDoc} */ - @Nullable @Override public String onRemoved(StreamerIndexEntry<String, String, String> entry, - String evt) { - // On remove we return null as index is unique. - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java deleted file mode 100644 index 27ce309..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java +++ /dev/null @@ -1,911 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Streamer window self test. - */ -public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testBoundedSizeWindowValidation() throws Exception { - final StreamerBoundedSizeWindow win = new StreamerBoundedSizeWindow(); - - win.start(); - - win.setMaximumSize(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeWindowValidation() throws Exception { - final StreamerBoundedTimeWindow win = new StreamerBoundedTimeWindow(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setTimeInterval(1); - - win.start(); - - win.setMaximumSize(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeBatchWindowValidation() throws Exception { - final StreamerBoundedSizeBatchWindow win = new StreamerBoundedSizeBatchWindow(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchSize(1); - - win.start(); - - win.setMaximumBatches(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeBatchWindowValidation() throws Exception { - final StreamerBoundedTimeBatchWindow win = new StreamerBoundedTimeBatchWindow(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchSize(1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchTimeInterval(1); - win.setBatchSize(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchSize(1); - - win.start(); - - win.setMaximumBatches(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedWindow() throws Exception { - final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(50); - - win.start(); - - for (int i = 0; i < 50; i++) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - for(int i = 50; i < 60; i++) - win.enqueue(i); - - for (int i = 0; i < 10; i++) - assert i == win.pollEvicted(); - - assertNull(win.pollEvicted()); - - checkIterator(win); - - win.setMaximumSize(2); - - win.start(); - - win.enqueue(3, 2, 1); - - checkSnapshot(win.snapshot(true), 3, 2, 1); - checkSnapshot(win.snapshot(false), 2, 1); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedWindowUnique() throws Exception { - final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(50); - win.setUnique(true); - - win.start(); - - for (int i = 0; i < 50; i++) - win.enqueue(i); - - for (int i = 0; i < 50; i++) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - int idx = 0; - - for (Object evt : win) { - Integer next = (Integer)evt; - - assertEquals((Integer)idx++, next); - } - - checkIterator(win); - - win.setMaximumSize(2); - - win.start(); - - win.enqueue(3, 2, 1, 3); - - checkSnapshot(win.snapshot(true), 3, 2, 1); - checkSnapshot(win.snapshot(false), 2, 1); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSortedWindow() throws Exception { - final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>(); - - win.setMaximumSize(60); - - win.start(); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - for (int i = 59; i >= 30; i--) { - assert i == win.pollEvicted(); - assert i == win.pollEvicted(); - } - - assertNull(win.pollEvicted()); - - checkIterator(win); - - win.setMaximumSize(2); - - win.start(); - - win.enqueue(3, 2, 1, 4); - - checkSnapshot(win.snapshot(true), 1, 2, 3, 4); - checkSnapshot(win.snapshot(false), 3, 4); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSortedWindowUnique() throws Exception { - final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>(); - - win.setMaximumSize(-1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setMaximumSize(60); - win.setUnique(true); - - win.start(); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - for (int i = 99; i >= 60; i--) - win.enqueue(i); - - for (int i = 99; i >= 60; i--) - assert i == win.pollEvicted(); - - assertNull(win.pollEvicted()); - - checkIterator(win); - - win.setMaximumSize(2); - - win.start(); - - win.enqueue(3, 2, 1, 3, 4); - - checkSnapshot(win.snapshot(true), 1, 2, 3, 4); - checkSnapshot(win.snapshot(false), 3, 4); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeBatchDequeueWindow() throws Exception { - final StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchSize(10); - win.setMaximumBatches(2); - - win.start(); - - for (int i = 0; i < 20; i++) - win.enqueue(i); - - assertNull(win.pollEvicted()); - assertEquals(0, win.pollEvictedBatch().size()); - - win.enqueue(20); - - Collection<Integer> evicted = win.pollEvictedBatch(); - - assertEquals(10, evicted.size()); - - Iterator<Integer> it = evicted.iterator(); - - for (int i = 0; i < 10; i++) - assert i == it.next(); - - assertNull(win.pollEvicted()); - assertEquals(0, win.pollEvictedBatch().size()); - - for (int i = 21; i < 30; i++) - win.enqueue(i); - - assertNull(win.pollEvicted()); - assertEquals(0, win.pollEvictedBatch().size()); - - win.enqueue(30); - - assert 10 == win.pollEvicted(); - - evicted = win.pollEvictedBatch(); - - assertEquals(9, evicted.size()); - - it = evicted.iterator(); - - for (int i = 11; i < 20; i++) - assert i == it.next(); - - assertNull(win.pollEvicted()); - assertEquals(0, win.pollEvictedBatch().size()); - - checkIterator(win); - - win.setMaximumBatches(2); - win.setBatchSize(2); - - win.start(); - - win.enqueue(1, 2, 3, 4, 5, 6, 7); - - // We expect that the first two batches will be evicted. - checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7); - checkSnapshot(win.snapshot(false), 5, 6, 7); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeDequeueWindow() throws Exception { - final StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setMaximumSize(60); - win.setTimeInterval(40); - - win.start(); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - for (int i = 59; i >= 0; i--) - win.enqueue(i); - - for (int i = 59; i >= 0; i--) - assert i == win.pollEvicted(); - - assertNull(win.pollEvicted()); - - checkIterator(win); - - win.setMaximumSize(2); - win.setTimeInterval(200); - - win.start(); - - win.enqueue(1, 2, 3); - - checkSnapshot(win.snapshot(true), 1, 2, 3); - checkSnapshot(win.snapshot(false), 2, 3); - - U.sleep(400); - - win.enqueue(4); - - checkSnapshot(win.snapshot(true), 1, 2, 3, 4); - checkSnapshot(win.snapshot(false), 4); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeBatchDequeueWindow() throws Exception { - final StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - win.start(); - - return null; - } - }, IgniteException.class, null); - - win.setBatchSize(50); - win.setBatchTimeInterval(500); - win.setMaximumBatches(2); - - win.start(); - - for (int i = 0; i < 25; i++) - win.enqueue(i); - - U.sleep(1000); - - Collection<Integer> evicted = win.pollEvictedBatch(); - - assertNotNull(evicted); - assertEquals(25, evicted.size()); - - for (int i = 0; i < 101; i++) - win.enqueue(i); - - evicted = win.pollEvictedBatch(); - - assertNotNull(evicted); - assertEquals(50, evicted.size()); - - U.sleep(1000); - - evicted = win.pollEvictedBatch(); - - assertNotNull(evicted); - assertEquals(50, evicted.size()); - - evicted = win.pollEvictedBatch(); - - assertNotNull(evicted); - assertEquals(1, evicted.size()); - - checkIterator(win); - - win.setMaximumBatches(2); - win.setBatchSize(2); - win.setBatchTimeInterval(200); - - win.start(); - - win.enqueue(1, 2, 3, 4, 5, 6, 7); - - // We expect that the first two batches will be evicted. - checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7); - checkSnapshot(win.snapshot(false), 5, 6, 7); - - U.sleep(400); - - checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7); - checkSnapshot(win.snapshot(false)); - } - - /** - * @throws Exception If failed. - */ - public void testUnboundedDequeueWindow() throws Exception { - final StreamerUnboundedWindow<Integer> win = new StreamerUnboundedWindow<>(); - - win.start(); - - for (int i = 0; i < 50; i++) - win.enqueue(i); - - assertNull(win.pollEvicted()); - - assert win.size() == 50; - - checkIterator(win); - - win.reset(); - - win.enqueue(3, 1, 2); - - checkSnapshot(win.snapshot(true), 3, 1, 2); - checkSnapshot(win.snapshot(false), 3, 1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeDequeueWindowMultithreaded() throws Exception { - StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(500); - win.setUnique(false); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeDequeueWindowUniqueMultithreaded() throws Exception { - StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>(); - - win.setMaximumSize(500); - win.setUnique(true); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeBatchDequeueWindowMultithreaded() throws Exception { - StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>(); - - win.setMaximumBatches(10); - win.setBatchSize(50); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeSortedDequeueWindowMultithreaded() throws Exception { - StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>(); - - win.setMaximumSize(500); - win.setUnique(false); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedSizeSortedDequeueWindowUniqueMultithreaded() throws Exception { - StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>(); - - win.setMaximumSize(500); - win.setUnique(true); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeDequeueWindowMultithreaded() throws Exception { - StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>(); - - win.setMaximumSize(500); - win.setTimeInterval(40); // 40ms time interval. - win.setUnique(false); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - - U.sleep(1000); - - finalChecks(win, 0); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeDequeueWindowUniqueMultithreaded() throws Exception { - StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>(); - - win.setMaximumSize(500); - win.setTimeInterval(40); // 40ms time interval. - win.setUnique(true); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - - U.sleep(1000); - - finalChecks(win, 0); - } - - /** - * @throws Exception If failed. - */ - public void testBoundedTimeBatchDequeueWindowMultithreaded() throws Exception { - StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>(); - - win.setMaximumBatches(10); - win.setBatchTimeInterval(100); - win.setBatchSize(50); - - win.start(); - - checkWindowMultithreaded(win, 100000, 10, 1000); - - win.consistencyCheck(); - - finalChecks(win, 500); - - U.sleep(1000); - - finalChecks(win, 0); - } - - /** - * Check iterator behaviour. - * - * @param win Window. - * @throws Exception If failed. - */ - private void checkIterator(StreamerWindow<Integer> win) throws Exception { - win.reset(); - - assert win.size() == 0; - - win.enqueue(1); - - assert win.size() == 1; - - final Iterator<Integer> iter = win.iterator(); - - win.enqueue(2); - - assert win.size() == 2; - - assert iter.hasNext(); - - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - iter.remove(); - - return null; - } - }, IllegalStateException.class, null); - - assert iter.next() == 1; - - iter.remove(); - - assert !iter.hasNext(); - - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - iter.next(); - - return null; - } - }, NoSuchElementException.class, null); - - assert win.size() == 1; - } - - /** - * Final checks. - * - * @param win Window to check. - * @param maxSize Max window size. - */ - private void finalChecks(StreamerWindow<Integer> win, int maxSize) { - int evictQueueSize = win.evictionQueueSize(); - - info("Eviction queue size for final checks: " + evictQueueSize); - - Collection<Integer> evicted = win.pollEvictedAll(); - - info("Evicted entries in final checks: " + evicted.size()); - - int winSize = win.size(); - - win.pollEvictedAll(); - - assertTrue("Unexpected window size [winSize=" + winSize + " maxSize=" + maxSize + ']', winSize <= maxSize); - } - - /** - * @param win Window to check. - * @param iterCnt Iteration count. - * @param threadCnt Thread count. - * @param range Range for key generation. - * @throws Exception If failed. - */ - private void checkWindowMultithreaded( - final StreamerWindow<Integer> win, - final int iterCnt, - int threadCnt, - final int range - ) throws Exception { - final AtomicInteger polled = new GridAtomicInteger(); - - final AtomicInteger added = new GridAtomicInteger(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Random rnd = new Random(); - - for (int i = 0; i < iterCnt; i++) { - if (i > 0 && i % 10000 == 0) - info("Finished " + i + " iterations"); - - int op = rnd.nextInt(8); - - switch (op) { - case 0: { - // Add. - for (int j = 0; j < 30; j++) - win.enqueue(rnd.nextInt(range)); - - added.addAndGet(30); - - break; - } - - case 1: { - // Add bunch. - for (int j = 0; j < 10; j++) - win.enqueue(rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range), - rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range)); - - added.addAndGet(10 * 6); - - break; - } - - case 2: { - Object o = win.pollEvicted(); - - if (o != null) - polled.incrementAndGet(); - - break; - } - - case 3: { - Collection<Integer> p0 = win.pollEvicted(50); - - polled.addAndGet(p0.size()); - - break; - } - - case 4: { - Collection<Integer> p0 = win.pollEvictedBatch(); - - polled.addAndGet(p0.size()); - - break; - } - - case 5: { - Object o = win.dequeue(); - - if (o != null) - polled.incrementAndGet(); - - break; - } - - case 6: { - Collection<Integer> p0 = win.dequeue(50); - - polled.addAndGet(p0.size()); - - break; - } - - case 7: { - Iterator<Integer> it = win.iterator(); - - while (it.hasNext()) { - it.next(); - - if (rnd.nextInt(10) == 5) { - it.remove(); - - polled.incrementAndGet(); - } - } - - break; - } - } - } - - return null; - } - }, threadCnt); - - fut.get(); - - // Cannot assert on added, polled and window size because iterator does not return status. - info("Window size: " + win.size()); - info("Added=" + added.get() + ", polled=" + polled.get()); - } - - /** - * Check snapshto content. - * - * @param snapshot Snapshot. - * @param vals Expected values. - */ - private void checkSnapshot(Collection<Integer> snapshot, Object... vals) { - assert snapshot.size() == vals.length; - - int i = 0; - - for (Object evt : snapshot) - assertTrue(F.eq(evt, vals[i++])); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index 014f229..8ae4596 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -186,16 +186,6 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ - @Override public IgniteStreamer streamer(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteStreamer> streamers() { - return null; - } - - /** {@inheritDoc} */ @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 6091338..7ae237f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -70,9 +70,6 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(GridLifecycleAwareSelfTest.class); suite.addTestSuite(GridMessageListenSelfTest.class); - // Streamer. - suite.addTest(IgniteStreamerSelfTestSuite.suite()); - return suite; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java deleted file mode 100644 index 0245b1c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.testsuites; - -import junit.framework.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.apache.ignite.streamer.window.*; - -/** - * Streamer test suite. - */ -public class IgniteStreamerSelfTestSuite { - /** - * @return Test suite. - * @throws Exception If failed. - */ - public static TestSuite suite() throws Exception { - TestSuite suite = new TestSuite("Ignite Streamer Test Suite."); - - // Streamer. - suite.addTestSuite(GridStreamerWindowSelfTest.class); - suite.addTestSuite(GridStreamerEvictionSelfTest.class); - suite.addTestSuite(GridStreamerSelfTest.class); - suite.addTestSuite(GridStreamerFailoverSelfTest.class); - suite.addTestSuite(GridStreamerIndexSelfTest.class); - suite.addTestSuite(GridStreamerLifecycleAwareSelfTest.class); - - return suite; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 705fa27..1b9c4d2 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -135,13 +135,6 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ - @Override public Collection<IgniteStreamer> streamers() { - assert g != null; - - return g.streamers(); - } - - /** {@inheritDoc} */ @Override public IgniteCompute compute() { assert g != null; @@ -268,13 +261,6 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ - @Nullable @Override public IgniteStreamer streamer(@Nullable String name) { - assert g != null; - - return g.streamer(name); - } - - /** {@inheritDoc} */ @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException { assert g != null;