http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java deleted file mode 100644 index ef7900d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.internal.processors.streamer.*; -import org.jdk8.backport.*; - -import java.util.*; - -/** - * Unbounded window which holds all events. Events can be evicted manually from window - * via any of the {@code dequeue(...)} methods. - */ -public class StreamerUnboundedWindow<E> extends StreamerWindowAdapter<E> { - /** Events. */ - private ConcurrentLinkedDeque8<E> evts = new ConcurrentLinkedDeque8<>(); - - /** {@inheritDoc} */ - @Override protected void stop0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void checkConfiguration() { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void reset0() { - evts.clear(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return evts.sizex(); - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iterator0() { - final ConcurrentLinkedDeque8.IteratorEx<E> it = (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator(); - - return new GridStreamerWindowIterator<E>() { - private E lastRet; - - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public E next() { - lastRet = it.next(); - - return lastRet; - } - - @Override public E removex() { - return (it.removex()) ? lastRet : null; - } - }; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public boolean enqueue0(E evt) { - return evts.add(evt); - } - - /** {@inheritDoc} */ - @Override protected Collection<E> dequeue0(int cnt) { - Collection<E> res = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - E evicted = evts.pollLast(); - - if (evicted != null) - res.add(evicted); - else - break; - } - - return res; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvicted0(int cnt) { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvictedBatch0() { - return Collections.emptyList(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java deleted file mode 100644 index 963671a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java +++ /dev/null @@ -1,537 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.lifecycle.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Streamer window adapter. - */ -public abstract class StreamerWindowAdapter<E> implements LifecycleAware, StreamerWindow<E>, - StreamerWindowMBean { - /** Default window name. */ - private String name = getClass().getSimpleName(); - - /** Filter predicate. */ - private IgnitePredicate<Object> filter; - - /** Indexes. */ - private Map<String, StreamerIndexProvider<E, ?, ?>> idxsAsMap; - - /** */ - private StreamerIndexProvider<E, ?, ?>[] idxs; - - /** Lock for updates and snapshot. */ - private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); - - /** {@inheritDoc} */ - @Override public String getClassName() { - return U.compact(getClass().getName()); - } - - /** {@inheritDoc} */ - @Override public int getSize() { - return size(); - } - - /** {@inheritDoc} */ - @Override public int getEvictionQueueSize() { - return evictionQueueSize(); - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public Iterator<E> iterator() { - return new BoundedIterator(iterator0()); - } - - /** - * Returns an iterator over a set of elements of type T without check for iteration limit. That is, - * in case concurrent thread constantly adding new elements to the window we could iterate forever. - * - * @return Iterator. - */ - protected abstract GridStreamerWindowIterator<E> iterator0(); - - /** {@inheritDoc} */ - @Override public boolean enqueue(E evt) { - lock.readLock(); - - try { - boolean res = (filter == null || filter.apply(evt)); - - if (res) { - updateIndexes(evt, false); - - if (!enqueue0(evt)) - updateIndexes(evt, true); - } - - return res; - } - finally { - lock.readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public boolean enqueue(E... evts) { - return enqueueAll(Arrays.asList(evts)); - } - - /** {@inheritDoc} */ - @Override public boolean enqueueAll(Collection<E> evts) { - lock.readLock(); - - try { - boolean ignoreFilter = filter == null || F.isAlwaysTrue(filter); - - boolean res = true; - - for (E evt : evts) { - if (ignoreFilter || filter.apply(evt)) { - updateIndexes(evt, false); - - boolean added = enqueue0(evt); - - if (!added) - updateIndexes(evt, true); - - res &= added; - } - } - - return res; - } - finally { - lock.readUnlock(); - } - } - - /** - * Adds event to window. - * - * @param evt Event. - * @return {@code True} if event added. - */ - protected abstract boolean enqueue0(E evt); - - /** {@inheritDoc} */ - @Override public E dequeue() { - return F.first(dequeue(1)); - } - - /** {@inheritDoc} */ - @Override public Collection<E> dequeueAll() { - return dequeue(size()); - } - - /** {@inheritDoc} */ - @Override public Collection<E> dequeue(int cnt) { - lock.readLock(); - - try { - Collection<E> evts = dequeue0(cnt); - - if (!evts.isEmpty() && idxs != null) { - for (E evt : evts) - updateIndexes(evt, true); - } - - return evts; - } - finally { - lock.readUnlock(); - } - } - - /** - * Dequeues up to cnt elements from window. If current window size is less than cnt, will dequeue all elements - * from window. - * - * @param cnt Count. - * @return Dequeued elements. - */ - protected abstract Collection<E> dequeue0(int cnt); - - /** {@inheritDoc} */ - @Override public E pollEvicted() { - return F.first(pollEvicted(1)); - } - - /** {@inheritDoc} */ - @Override public Collection<E> pollEvictedAll() { - return pollEvicted(evictionQueueSize()); - } - - /** {@inheritDoc} */ - @Override public Collection<E> pollEvicted(int cnt) { - lock.readLock(); - - try { - Collection<E> evts = pollEvicted0(cnt); - - if (!evts.isEmpty() && idxs != null) { - for (E evt : evts) - updateIndexes(evt, true); - } - - return evts; - } - finally { - lock.readUnlock(); - } - } - - /** - * If window supports eviction, this method will return up to cnt evicted elements. - * - * @param cnt Count. - * @return Evicted elements. - */ - protected abstract Collection<E> pollEvicted0(int cnt); - - /** {@inheritDoc} */ - @Override public Collection<E> pollEvictedBatch() { - lock.readLock(); - - try { - Collection<E> evts = pollEvictedBatch0(); - - if (!evts.isEmpty() && idxs != null) { - for (E evt : evts) - updateIndexes(evt, true); - } - - return evts; - } - finally { - lock.readUnlock(); - } - } - - /** - * If window supports batch eviction, this method will poll next evicted batch from window. If windows does not - * support batch eviction but supports eviction, will return collection of single last evicted element. If window - * does not support eviction, will return empty collection. - * - * @return Elements from evicted batch. - */ - protected abstract Collection<E> pollEvictedBatch0(); - - /** {@inheritDoc} */ - @Override public final void start() { - checkConfiguration(); - - if (idxs != null) { - for (StreamerIndexProvider<E, ?, ?> idx : idxs) - idx.initialize(); - } - - reset(); - } - - /** {@inheritDoc} */ - @Override public final void reset(){ - lock.writeLock(); - - try { - if (idxs != null) { - for (StreamerIndexProvider<E, ?, ?> idx : idxs) - idx.reset(); - } - - reset0(); - } - finally { - lock.writeUnlock(); - } - } - - /** - * Check window configuration. - * - * @throws IgniteException If failed. - */ - protected abstract void checkConfiguration() throws IgniteException; - - /** - * Reset routine. - */ - protected abstract void reset0(); - - /** {@inheritDoc} */ - @Override public void stop() { - lock.writeLock(); - - try { - stop0(); - } - finally { - lock.writeUnlock(); - } - } - - /** - * Dispose window. - */ - protected abstract void stop0(); - - /** {@inheritDoc} */ - @Override public Collection<E> snapshot(boolean includeEvicted) { - lock.writeLock(); - - try { - int skip = includeEvicted ? 0 : evictionQueueSize(); - - List<E> res = new ArrayList<>(size() - skip); - - Iterator<E> iter = iterator(); - - int i = 0; - - while (iter.hasNext()) { - E next = iter.next(); - - if (i++ >= skip) - res.add(next); - } - - return Collections.unmodifiableList(res); - } - finally { - lock.writeUnlock(); - } - } - - /** - * Sets window name. - * - * @param name Window name. - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets optional event filter. - * - * @return Optional event filter. - */ - @Nullable public IgnitePredicate<Object> getFilter() { - return filter; - } - - /** - * Sets event filter. - * - * @param filter Event filter. - */ - public void setFilter(@Nullable IgnitePredicate<Object> filter) { - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public <K, V> StreamerIndex<E, K, V> index() { - return index(null); - } - - /** {@inheritDoc} */ - @Override public <K, V> StreamerIndex<E, K, V> index(@Nullable String name) { - if (idxsAsMap != null) { - StreamerIndexProvider<E, K, V> idx = (StreamerIndexProvider<E, K, V>)idxsAsMap.get(name); - - if (idx == null) - throw new IllegalArgumentException("Streamer index is not configured: " + name); - - return idx.index(); - } - - throw new IllegalArgumentException("Streamer index is not configured: " + name); - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndex<E, ?, ?>> indexes() { - if (idxs != null) { - Collection<StreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length); - - for (StreamerIndexProvider<E, ?, ?> idx : idxs) - res.add(idx.index()); - - return res; - } - else - return Collections.emptyList(); - } - - /** - * Get array of index providers. - * - * @return Index providers. - */ - public StreamerIndexProvider<E, ?, ?>[] indexProviders() { - return idxs; - } - - /** - * Set indexes. - * - * @param idxs Indexes. - * @throws IllegalArgumentException If some index names are not unique. - */ - @SuppressWarnings("unchecked") - public void setIndexes(StreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException { - A.ensure(!F.isEmpty(idxs), "!F.isEmpty(idxs)"); - - idxsAsMap = new HashMap<>(idxs.length, 1.0f); - this.idxs = new StreamerIndexProvider[idxs.length]; - - int i = 0; - - for (StreamerIndexProvider<E, ?, ?> idx : idxs) { - StreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx); - - if (old != null) - throw new IllegalArgumentException("Index name is not unique [idx1=" + old + ", idx2=" + idx + ']'); - - this.idxs[i++] = idx; - } - } - - /** {@inheritDoc} */ - @Override public void clearEvicted() { - pollEvictedAll(); - } - - /** - * Update indexes. - * - * @param evt Event. - * @param rmv Remove flag. - * @throws IgniteException If index update failed. - */ - protected void updateIndexes(E evt, boolean rmv) throws IgniteException { - if (idxs != null) { - StreamerIndexUpdateSync sync = new StreamerIndexUpdateSync(); - - boolean rollback = true; - - try { - for (StreamerIndexProvider<E, ?, ?> idx : idxs) { - if (rmv) - idx.remove(sync, evt); - else - idx.add(sync, evt); - } - - rollback = false; - } - finally { - for (StreamerIndexProvider<E, ?, ?> idx : idxs) - idx.endUpdate(sync, evt, rollback, rmv); - - sync.finish(1); - } - } - } - - /** - * Window iterator wrapper which prevent returning more elements that existed in the underlying collection by the - * time of iterator creation. - */ - private class BoundedIterator implements Iterator<E> { - /** Iterator. */ - private final GridStreamerWindowIterator<E> iter; - - /** How many elements to return left (at most). */ - private int left; - - /** - * Constructor. - * - * @param iter Iterator. - */ - private BoundedIterator(GridStreamerWindowIterator<E> iter) { - assert iter != null; - assert lock != null; - - this.iter = iter; - - left = size(); - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return left > 0 && iter.hasNext(); - } - - /** {@inheritDoc} */ - @Override public E next() { - left--; - - if (left < 0) - throw new NoSuchElementException(); - - return iter.next(); - } - - /** {@inheritDoc} */ - @Override public void remove() { - if (left < 0) - throw new IllegalStateException(); - - lock.readLock(); - - try { - E evt = iter.removex(); - - if (evt != null) { - try { - updateIndexes(evt, true); - } - catch (IgniteException e) { - throw new IgniteException("Failed to remove event: " + evt, e); - } - } - } - finally { - lock.readUnlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java deleted file mode 100644 index 7d50c46..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains streamer window implementations. - */ -package org.apache.ignite.streamer.window; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java deleted file mode 100644 index 8985a91..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.window.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Tests for streamer eviction logic. - */ -public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of events used in test. */ - private static final int EVENTS_COUNT = 10; - - /** Test stages. */ - private Collection<StreamerStage> stages; - - /** Event router. */ - private StreamerEventRouter router; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setStreamerConfiguration(streamerConfiguration()); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - return cfg; - } - - /** - * @return Streamer configuration. - */ - private StreamerConfiguration streamerConfiguration() { - StreamerConfiguration cfg = new StreamerConfiguration(); - - cfg.setRouter(router); - - StreamerBoundedTimeWindow window = new StreamerBoundedTimeWindow(); - - window.setName("window1"); - window.setTimeInterval(60000); - - cfg.setWindows(F.asList((StreamerWindow)window)); - - cfg.setStages(stages); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testContextNextStage() throws Exception { - router = new GridTestStreamerEventRouter(); - - final CountDownLatch finishLatch = new CountDownLatch(EVENTS_COUNT); - final AtomicReference<AssertionError> err = new AtomicReference<>(); - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - assert evts.size() == 1; - - if (ctx.nextStageName() == null) { - finishLatch.countDown(); - - return null; - } - - StreamerWindow win = ctx.window("window1"); - - // Add new events to the window. - win.enqueueAll(evts); - - try { - assertEquals(0, win.evictionQueueSize()); - } - catch (AssertionError e) { - err.compareAndSet(null, e); - } - - // Evict outdated events from the window. - Collection evictedEvts = win.pollEvictedAll(); - - try { - assertEquals(0, evictedEvts.size()); - } - catch (AssertionError e) { - err.compareAndSet(null, e); - } - - Integer val = (Integer)F.first(evts); - - return (Map)F.asMap(ctx.nextStageName(), F.asList(++val)); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage)); - - startGrids(2); - - try { - GridTestStreamerEventRouter router = (GridTestStreamerEventRouter)this.router; - - router.put("0", grid(0).localNode().id()); - router.put("1", grid(1).localNode().id()); - - for (int i = 0; i < EVENTS_COUNT; i++) - grid(0).streamer(null).addEvent(i); - - boolean await = finishLatch.await(5, SECONDS); - - if (err.get() != null) - throw err.get(); - - if (!await) - fail("Some events didn't finished."); - } - finally { - stopAllGrids(false); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java deleted file mode 100644 index a7f99d8..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.window.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Event router. */ - private TestRandomRouter router; - - /** Maximum number of concurrent sessions for test. */ - private int maxConcurrentSess; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setStreamerConfiguration(streamerConfiguration()); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setPeerClassLoadingEnabled(false); - - return cfg; - } - - /** - * @return Streamer configuration. - */ - private StreamerConfiguration streamerConfiguration() { - StreamerConfiguration cfg = new StreamerConfiguration(); - - cfg.setAtLeastOnce(true); - - cfg.setRouter(router); - - StreamerBoundedSizeWindow window = new StreamerBoundedSizeWindow(); - - window.setMaximumSize(100); - - cfg.setWindows(F.asList((StreamerWindow)window)); - - cfg.setMaximumConcurrentSessions(maxConcurrentSess); - - SC pass = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> objects) { - assert ctx.nextStageName() != null; - - // Pass to next stage. - return (Map)F.asMap(ctx.nextStageName(), objects); - } - }; - - SC put = new SC() { - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - ConcurrentMap<Object, AtomicInteger> cntrs = ctx.localSpace(); - - for (Object evt : evts) { - AtomicInteger cnt = cntrs.get(evt); - - if (cnt == null) - cnt = F.addIfAbsent(cntrs, evt, new AtomicInteger()); - - cnt.incrementAndGet(); - } - - return null; - } - }; - - cfg.setStages(F.asList((StreamerStage)new GridTestStage("pass", pass), new GridTestStage("put", put))); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testEventFailover() throws Exception { - checkEventFailover(500); - } - - /** - * @throws Exception If failed. - */ - private void checkEventFailover(int max) throws Exception { - router = new TestRandomRouter(); - maxConcurrentSess = max; - - startGrids(6); - - try { - router.sourceNodeId(grid(0).localNode().id()); - router.destinationNodeId(grid(5).localNode().id()); - - final AtomicBoolean done = new AtomicBoolean(false); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Random rnd = new Random(); - - while (!done.get()) { - // Pick a random grid to restart. - int idx = rnd.nextInt(4) + 1; - - info(">>>>> Stopping grid " + grid(idx).localNode().id()); - - stopGrid(idx, true); - - U.sleep(1000); - - startGrid(idx); - - info(">>>>>> Started grid " + grid(idx).localNode().id()); - - U.sleep(500); - } - - return null; - } - }, 1, "restarter"); - - final Collection<Object> failed = new ConcurrentLinkedQueue<>(); - - IgniteStreamer streamer = grid(0).streamer(null); - - streamer.addStreamerFailureListener(new StreamerFailureListener() { - @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) { - info("Unable to failover events [stageName=" + stageName + ", err=" + err + ']'); - - failed.addAll(evts); - } - }); - - final int evtsCnt = 300000; - - // Now we are ready to process events. - for (int i = 0; i < evtsCnt; i++) { - if (i > 0 && i % 10000 == 0) - info("Processed: " + i); - - streamer.addEvent(i); - } - - done.set(true); - - fut.get(); - - // Do not cancel and wait for all tasks to finish. - G.stop(getTestGridName(0), false); - - ConcurrentMap<Integer, AtomicInteger> finSpace = grid(5).streamer(null).context().localSpace(); - - for (int i = 0; i < evtsCnt; i++) { - AtomicInteger cnt = finSpace.get(i); - - if (cnt == null) { - assertTrue("Missing counter for key both in result map and in failover failed map: " + i, - failed.contains(i)); - } - else - assertTrue(cnt.get() > 0); - } - } - finally { - stopAllGrids(); - } - } - - /** - * Test random router. - */ - private static class TestRandomRouter extends StreamerEventRouterAdapter { - /** Source node ID. */ - private UUID srcNodeId; - - /** Destination node ID. */ - private UUID destNodeId; - - /** {@inheritDoc} */ - @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - if ("put".equals(stageName)) - return ctx.projection().node(destNodeId); - - // Route to random node different from srcNodeId. - Collection<ClusterNode> nodes = ctx.projection().forPredicate(new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return !srcNodeId.equals(n.id()) && !destNodeId.equals(n.id()); - } - }).nodes(); - - int idx = ThreadLocalRandom8.current().nextInt(nodes.size()); - - int i = 0; - - Iterator<ClusterNode> iter = nodes.iterator(); - - while (true) { - if (!iter.hasNext()) - iter = nodes.iterator(); - - ClusterNode node = iter.next(); - - if (idx == i++) - return node; - } - } - - /** - * @param srcNodeId New source node ID. - */ - public void sourceNodeId(UUID srcNodeId) { - this.srcNodeId = srcNodeId; - } - - /** - * @param destNodeId New destination node ID. - */ - public void destinationNodeId(UUID destNodeId) { - this.destNodeId = destNodeId; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java deleted file mode 100644 index 4141948..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.apache.ignite.streamer.StreamerConfiguration}. - */ -public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest { - /** - */ - private static class TestEventRouter extends TestLifecycleAware implements StreamerEventRouter { - /** - */ - TestEventRouter() { - super(null); - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, - String stageName, Collection<T> evts) { - return null; - } - } - - /** - */ - private static class TestStage extends TestLifecycleAware implements StreamerStage { - /** - */ - TestStage() { - super(null); - } - - /** {@inheritDoc} */ - @Override public String name() { - return "dummy"; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) { - return null; - } - } - - /** - */ - private static class TestWindow extends TestLifecycleAware implements StreamerWindow { - /** - */ - TestWindow() { - super(null); - } - - /** {@inheritDoc} */ - @Override public String name() { - return "dummy"; - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndex index() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public StreamerIndex index(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerIndex> indexes() { - return null; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int size() { - return 0; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public boolean enqueue(Object evt) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean enqueue(Object... evts) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean enqueueAll(Collection evts) { - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object dequeue() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection dequeue(int cnt) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection dequeueAll() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object pollEvicted() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection pollEvicted(int cnt) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection pollEvictedBatch() { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection pollEvictedAll() { - return null; - } - - /** {@inheritDoc} */ - @Override public void clearEvicted() { - } - - /** {@inheritDoc} */ - @Override public Collection snapshot(boolean includeIvicted) { - return null; - } - - /** {@inheritDoc} */ - @Override public Iterator iterator() { - return null; - } - } - - /** {@inheritDoc} */ - @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - StreamerConfiguration streamerCfg = new StreamerConfiguration(); - - TestEventRouter router = new TestEventRouter(); - - streamerCfg.setRouter(router); - - lifecycleAwares.add(router); - - TestStage stage = new TestStage(); - - streamerCfg.setStages(F.asList((StreamerStage)stage)); - - lifecycleAwares.add(stage); - - TestWindow window = new TestWindow(); - - streamerCfg.setWindows(F.asList((StreamerWindow)window)); - - lifecycleAwares.add(window); - - cfg.setStreamerConfiguration(streamerCfg); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java deleted file mode 100644 index 19b9c67..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.router.*; -import org.apache.ignite.streamer.window.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.config.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.configuration.DeploymentMode.*; - -/** - * Basic streamer test. - */ -public class GridStreamerSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private boolean atLeastOnce = true; - - /** Test stages. */ - private Collection<StreamerStage> stages; - - /** Event router. */ - private StreamerEventRouter router; - - /** P2P enabled flag. */ - private boolean p2pEnabled; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setStreamerConfiguration(streamerConfiguration()); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setPeerClassLoadingEnabled(p2pEnabled); - - if (p2pEnabled) - cfg.setDeploymentMode(SHARED); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - return cfg; - } - - /** - * @return Streamer configuration. - */ - private StreamerConfiguration streamerConfiguration() { - StreamerConfiguration cfg = new StreamerConfiguration(); - - cfg.setAtLeastOnce(atLeastOnce); - - cfg.setRouter(router); - - cfg.setWindows(F.asList((StreamerWindow)new StreamerUnboundedWindow())); - - cfg.setStages(stages); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testInjections() throws Exception { - final int evtCnt = 100; - - final CountDownLatch finishLatch = new CountDownLatch(evtCnt); - - stages = F.<StreamerStage>asList(new StreamerStage() { - @IgniteInstanceResource - private Ignite g; - - @LoggerResource - private IgniteLogger log; - - @Override public String name() { - return "name"; - } - - @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) { - assert g != null; - assert log != null; - - log.info("Processing events: " + evts); - - finishLatch.countDown(); - - return null; - } - }); - - try { - final Ignite ignite0 = startGrid(0); - - IgniteStreamer streamer = ignite0.streamer(null); - - for (int i = 0; i < evtCnt; i++) - streamer.addEvent("event1"); - - assert finishLatch.await(10, SECONDS); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testStreamerMetrics() throws Exception { - atLeastOnce = true; - p2pEnabled = false; - router = new GridTestStreamerEventRouter(); - - final int evtCnt = 100; - - final CountDownLatch finishLatch = new CountDownLatch(evtCnt); - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) - throws IgniteCheckedException { - String nextStage = ctx.nextStageName(); - - U.sleep(50); - - if (nextStage == null) { - finishLatch.countDown(); - - return null; - } - - return (Map)F.asMap(nextStage, evts); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage), - new GridTestStage("c", stage)); - - startGrids(4); - - try { - final Ignite ignite0 = grid(0); - final Ignite ignite1 = grid(1); - final Ignite ignite2 = grid(2); - final Ignite ignite3 = grid(3); - - System.out.println("Grid 0: " + ignite0.cluster().localNode().id()); - System.out.println("Grid 1: " + ignite1.cluster().localNode().id()); - System.out.println("Grid 2: " + ignite2.cluster().localNode().id()); - System.out.println("Grid 3: " + ignite3.cluster().localNode().id()); - - GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router; - - router0.put("a", ignite1.cluster().localNode().id()); - router0.put("b", ignite2.cluster().localNode().id()); - router0.put("c", ignite3.cluster().localNode().id()); - - IgniteStreamer streamer = ignite0.streamer(null); - - for (int i = 0; i < evtCnt; i++) - streamer.addEvent("event1"); - - finishLatch.await(); - - // No stages should be executed on grid0. - checkZeroMetrics(ignite0, "a", "b", "c"); - checkZeroMetrics(ignite1, "b", "c"); - checkZeroMetrics(ignite2, "a", "c"); - checkZeroMetrics(ignite3, "a", "b"); - - checkMetrics(ignite1, "a", evtCnt, false); - checkMetrics(ignite2, "b", evtCnt, false); - checkMetrics(ignite3, "c", evtCnt, true); - - // Wait until all acks are received. - GridTestUtils.retryAssert(log, 100, 50, new CA() { - @Override public void apply() { - StreamerMetrics metrics = ignite0.streamer(null).metrics(); - - assertEquals(0, metrics.currentActiveSessions()); - } - }); - - StreamerMetrics metrics = ignite0.streamer(null).metrics(); - - assertTrue(metrics.maximumActiveSessions() > 0); - - ignite0.streamer(null).context().query(new IgniteClosure<StreamerContext, Object>() { - @Override public Object apply(StreamerContext ctx) { - try { - U.sleep(1000); - } - catch (IgniteInterruptedCheckedException ignore) { - // No-op. - } - - return null; - } - }); - - metrics = ignite0.streamer(null).metrics(); - - assert metrics.queryMaximumExecutionNodes() == 4; - assert metrics.queryMinimumExecutionNodes() == 4; - assert metrics.queryAverageExecutionNodes() == 4; - - assert metrics.queryMaximumExecutionTime() > 0; - assert metrics.queryMinimumExecutionTime() > 0; - assert metrics.queryAverageExecutionTime() > 0; - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testContextNextStage() throws Exception { - atLeastOnce = true; - router = new GridTestStreamerEventRouter(); - p2pEnabled = false; - - final CountDownLatch finishLatch = new CountDownLatch(1); - final AtomicReference<IgniteCheckedException> err = new AtomicReference<>(); - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - String nextStage = ctx.nextStageName(); - - if (nextStage == null) { - finishLatch.countDown(); - - return null; - } - - assert evts.size() == 1; - - Integer val = (Integer)F.first(evts); - - val++; - - if (!String.valueOf(val).equals(ctx.nextStageName())) - err.compareAndSet(null, new IgniteCheckedException("Stage name comparison failed [exp=" + val + - ", actual=" + ctx.nextStageName() + ']')); - - return (Map)F.asMap(ctx.nextStageName(), F.asList(val)); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage), - new GridTestStage("2", stage), new GridTestStage("3", stage), new GridTestStage("4", stage)); - - startGrids(4); - - try { - GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router; - - router0.put("0", grid(1).localNode().id()); - router0.put("1", grid(2).localNode().id()); - router0.put("2", grid(3).localNode().id()); - router0.put("3", grid(0).localNode().id()); - router0.put("4", grid(1).localNode().id()); - - grid(0).streamer(null).addEvent(0); - - finishLatch.await(); - - if (err.get() != null) - throw err.get(); - } - finally { - stopAllGrids(false); - } - } - - /** - * @throws Exception If failed. - */ - public void testAddEventWithNullStageName() throws Exception { - atLeastOnce = true; - router = new GridTestStreamerEventRouter(); - p2pEnabled = false; - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - String nextStage = ctx.nextStageName(); - - if (nextStage == null) - return null; - - Integer val = (Integer)F.first(evts); - - return (Map)F.asMap(ctx.nextStageName(), F.asList(++val)); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage)); - - startGrids(2); - - try { - GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router; - - router0.put("0", grid(0).localNode().id()); - router0.put("1", grid(1).localNode().id()); - - try { - grid(0).streamer(null).addEventToStage(null, 0); - - fail(); - } - catch (NullPointerException e) { - assertTrue(e.getMessage().contains("Argument cannot be null: stageName")); - - info("Caught expected exception: " + e.getMessage()); - } - - try { - grid(0).streamer(null).addEventsToStage(null, Collections.singletonList(0)); - - fail(); - } - catch (NullPointerException e) { - assertTrue(e.getMessage().contains("Argument cannot be null: stageName")); - - info("Caught expected exception: " + e.getMessage()); - } - } - finally { - stopAllGrids(false); - } - } - - /** - * @throws Exception If failed. - */ - public void testNullStageNameInResultMap() throws Exception { - atLeastOnce = true; - router = new GridTestStreamerEventRouter(); - p2pEnabled = false; - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - String nextStage = ctx.nextStageName(); - - if (nextStage == null) - return null; - - Integer val = (Integer)F.first(evts); - - Map<String, Collection<?>> res = new HashMap<>(); - - res.put(null, F.asList(++val)); - - return res; - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage)); - - startGrids(2); - - try { - GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router; - - final CountDownLatch errLatch = new CountDownLatch(1); - - grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() { - @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) { - info("Expected failure: " + err.getMessage()); - - errLatch.countDown(); - } - }); - - router0.put("0", grid(0).localNode().id()); - router0.put("1", grid(1).localNode().id()); - - grid(0).streamer(null).addEvent(0); - - assert errLatch.await(5, TimeUnit.SECONDS); - } - finally { - stopAllGrids(false); - } - } - - /** - * @throws Exception If failed. - */ - public void testPeerDeployment() throws Exception { - URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; - - GridTestExternalClassLoader ldr = new GridTestExternalClassLoader(urls); - - Class<?> cls = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestKey"); - - assert cls != null; - - final int evtCnt = 100; - - final CountDownLatch finishLatch = new CountDownLatch(evtCnt); - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - String nextStage = ctx.nextStageName(); - - ctx.window().enqueueAll(evts); - - if (nextStage == null) { - finishLatch.countDown(); - - return null; - } - - return (Map)F.asMap(nextStage, evts); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage), - new GridTestStage("c", stage)); - router = new GridTestStreamerEventRouter(); - atLeastOnce = true; - p2pEnabled = true; - - startGrids(4); - - try { - final Ignite ignite0 = grid(0); - final Ignite ignite1 = grid(1); - final Ignite ignite2 = grid(2); - final Ignite ignite3 = grid(3); - - System.out.println("Grid 0: " + ignite0.cluster().localNode().id()); - System.out.println("Grid 1: " + ignite1.cluster().localNode().id()); - System.out.println("Grid 2: " + ignite2.cluster().localNode().id()); - System.out.println("Grid 3: " + ignite3.cluster().localNode().id()); - - GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router; - - router0.put("a", ignite1.cluster().localNode().id()); - router0.put("b", ignite2.cluster().localNode().id()); - router0.put("c", ignite3.cluster().localNode().id()); - - IgniteStreamer streamer = ignite0.streamer(null); - - for (int i = 0; i < evtCnt; i++) - streamer.addEvent(cls.newInstance()); - - // Wait for all events to be processed. - finishLatch.await(); - - for (int i = 1; i < 4; i++) - assertEquals(evtCnt, grid(i).streamer(null).context().window().size()); - - // Check undeploy. - stopGrid(0, false); - - GridTestUtils.retryAssert(log, 50, 50, new CA() { - @Override public void apply() { - for (int i = 1; i < 4; i++) - assertEquals(0, grid(i).streamer(null).context().window().size()); - } - }); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testQuery() throws Exception { - atLeastOnce = true; - router = new StreamerRandomEventRouter(); - p2pEnabled = false; - - final int evtCnt = 1000; - - final CountDownLatch finishLatch = new CountDownLatch(evtCnt); - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - ConcurrentMap<String, AtomicInteger> space = ctx.localSpace(); - - AtomicInteger cntr = space.get(stageName); - - if (cntr == null) - cntr = F.addIfAbsent(space, stageName, new AtomicInteger()); - - for (Object val : evts) - cntr.addAndGet((Integer)val); - - String next = ctx.nextStageName(); - - if (next == null) { - finishLatch.countDown(); - - return null; - } - - return (Map)F.asMap(next, evts); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage), - new GridTestStage("c", stage), new GridTestStage("d", stage)); - - startGrids(4); - - try { - int sum = 0; - - int range = 1000; - - Random rnd = new Random(); - - for (int i = 0; i < evtCnt; i++) { - int val = rnd.nextInt(range); - - grid(0).streamer(null).addEvent(val); - - sum += val; - } - - finishLatch.await(); - - Map<String, Integer> stagesSum = new HashMap<>(4); - - final String[] stages = {"a", "b", "c", "d"}; - - // Check all stages local map. - for (int i = 0; i < 4; i++) { - Ignite ignite = grid(i); - - ConcurrentMap<String, AtomicInteger> locSpace = ignite.streamer(null).context().localSpace(); - - for (String stageName : stages) { - AtomicInteger val = locSpace.get(stageName); - - assertNotNull(val); - - info(">>>>> grid=" + ignite.cluster().localNode().id() + ", s=" + stageName + ", val=" + val.get()); - - Integer old = stagesSum.get(stageName); - - if (old == null) - stagesSum.put(stageName, val.get()); - else - stagesSum.put(stageName, old + val.get()); - } - } - - for (String s : stages) - assertEquals((Integer)sum, stagesSum.get(s)); - - StreamerContext streamerCtx = grid(0).streamer(null).context(); - - // Check query. - for (final String s : stages) { - Collection<Integer> res = streamerCtx.query(new C1<StreamerContext, Integer>() { - @Override public Integer apply(StreamerContext ctx) { - AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s); - - return cntr.get(); - } - }); - - assertEquals(sum, F.sumInt(res)); - } - - // Check broadcast. - streamerCtx.broadcast(new CI1<StreamerContext>() { - @Override public void apply(StreamerContext ctx) { - int sum = 0; - - ConcurrentMap<String, AtomicInteger> space = ctx.localSpace(); - - for (String s : stages) { - AtomicInteger cntr = space.get(s); - - sum += cntr.get(); - } - - space.put("bcast", new AtomicInteger(sum)); - } - }); - - int bcastSum = 0; - - for (int i = 0; i < 4; i++) { - Ignite ignite = grid(i); - - ConcurrentMap<String, AtomicInteger> locSpace = ignite.streamer(null).context().localSpace(); - - bcastSum += locSpace.get("bcast").get(); - } - - assertEquals(sum * stages.length, bcastSum); - - // Check reduce. - for (final String s : stages) { - Integer res = streamerCtx.reduce( - new C1<StreamerContext, Integer>() { - @Override public Integer apply(StreamerContext ctx) { - AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s); - - return cntr.get(); - } - }, - F.sumIntReducer()); - - assertEquals((Integer)sum, res); - } - } - finally { - stopAllGrids(false); - } - } - - /** - * @throws Exception If failed. - */ - public void testRandomRouterWithEmptyTopology() throws Exception { - atLeastOnce = true; - router = new StreamerRandomEventRouter(new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return false; - } - }); - p2pEnabled = false; - - SC stage = new SC() { - @SuppressWarnings("unchecked") - @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx, - Collection<Object> evts) { - return ctx.nextStageName() == null ? null : (Map)F.asMap(ctx.nextStageName(), F.asList(0)); - } - }; - - stages = F.asList((StreamerStage)new GridTestStage("0", stage),new GridTestStage("1", stage), - new GridTestStage("2", stage)); - - startGrids(1); - - try { - final int errCnt = 10; - - final CountDownLatch errLatch = new CountDownLatch(errCnt); - - grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() { - @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) { - info("Expected failure: " + err.getMessage()); - - errLatch.countDown(); - } - }); - - for (int i = 0; i < errCnt; i++) - grid(0).streamer(null).addEvent(0); - - assert errLatch.await(5, TimeUnit.SECONDS); - } - finally { - stopAllGrids(false); - } - } - - /** - * @param ignite Grid to check metrics on. - * @param stage Stage name. - * @param evtCnt Event count. - * @param pipeline Pipeline. - */ - private void checkMetrics(Ignite ignite, String stage, int evtCnt, boolean pipeline) { - IgniteStreamer streamer = ignite.streamer(null); - - StreamerMetrics metrics = streamer.metrics(); - - assertEquals(evtCnt, metrics.stageTotalExecutionCount()); - assertEquals(0, metrics.stageWaitingExecutionCount()); - assertEquals(0, metrics.currentActiveSessions()); - assertEquals(0, metrics.maximumActiveSessions()); - assertEquals(0, metrics.failuresCount()); - - if (pipeline) { - assertEquals(4, metrics.pipelineMaximumExecutionNodes()); - assertEquals(4, metrics.pipelineMinimumExecutionNodes()); - assertEquals(4, metrics.pipelineAverageExecutionNodes()); - - assertTrue(metrics.pipelineMaximumExecutionTime() > 0); - assertTrue(metrics.pipelineMinimumExecutionTime() > 0); - assertTrue(metrics.pipelineAverageExecutionTime() > 0); - } - else { - assertEquals(0, metrics.pipelineMaximumExecutionNodes()); - assertEquals(0, metrics.pipelineMinimumExecutionNodes()); - assertEquals(0, metrics.pipelineAverageExecutionNodes()); - - assertEquals(0, metrics.pipelineMaximumExecutionTime()); - assertEquals(0, metrics.pipelineMinimumExecutionTime()); - assertEquals(0, metrics.pipelineAverageExecutionTime()); - } - - StreamerStageMetrics stageMetrics = streamer.metrics().stageMetrics(stage); - - assertNotNull(stageMetrics); - - assertTrue(stageMetrics.averageExecutionTime() > 0); - assertTrue(stageMetrics.minimumExecutionTime() > 0); - assertTrue(stageMetrics.maximumExecutionTime() > 0); - assertEquals(evtCnt, stageMetrics.totalExecutionCount()); - assertEquals(0, stageMetrics.failuresCount()); - assertFalse(stageMetrics.executing()); - } - - /** - * @param ignite Grid to check streamer on. - * @param stages Stages to check. - */ - private void checkZeroMetrics(Ignite ignite, String... stages) { - for (String stage : stages) { - IgniteStreamer streamer = ignite.streamer(null); - - StreamerStageMetrics metrics = streamer.metrics().stageMetrics(stage); - - assertNotNull(metrics); - - assertEquals(0, metrics.failuresCount()); - assertEquals(0, metrics.averageExecutionTime()); - assertEquals(0, metrics.minimumExecutionTime()); - assertEquals(0, metrics.maximumExecutionTime()); - assertFalse(metrics.executing()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java deleted file mode 100644 index b4968c8..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Test stage. - */ -class GridTestStage implements StreamerStage<Object> { - /** Stage name. */ - private String name; - - /** Stage closure. */ - private SC stageClos; - - /** - * @param name Stage name. - * @param stageClos Stage closure to execute. - */ - GridTestStage(String name, SC stageClos) { - this.name = name; - this.stageClos = stageClos; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts) { - return stageClos.apply(name(), ctx, evts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java deleted file mode 100644 index ac81086..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Test router. - */ -class GridTestStreamerEventRouter extends StreamerEventRouterAdapter { - /** Route table. */ - private Map<String, UUID> routeTbl = new HashMap<>(); - - /** - * @param stageName Stage name. - * @param nodeId Node id. - */ - public void put(String stageName, UUID nodeId) { - routeTbl.put(stageName, nodeId); - } - - /** {@inheritDoc} */ - @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) { - UUID nodeId = routeTbl.get(stageName); - - if (nodeId == null) - return null; - - return ctx.projection().node(nodeId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java deleted file mode 100644 index ff457dd..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Typedef for generic closure. - */ -abstract class SC - extends GridClosure3X<String, StreamerContext, Collection<Object>, Map<String, Collection<?>>> { - // No-op. -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java deleted file mode 100644 index a2e2a04..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; - -import java.util.*; - -/** - * Closure for events generation. - */ -class EventClosure implements IgniteInClosure<IgniteStreamer> { - /** Random range. */ - private int rndRange = 100; - - /** {@inheritDoc} */ - @Override public void apply(IgniteStreamer streamer) { - Random rnd = new Random(); - - while (!Thread.interrupted()) { - try { - streamer.addEvent(rnd.nextInt(rndRange)); - } - catch (IgniteException e) { - X.println("Failed to add streamer event: " + e); - } - } - } - - /** - * @return Random range. - */ - public int getRandomRange() { - return rndRange; - } - - /** - * @param rndRange Random range. - */ - public void setRandomRange(int rndRange) { - this.rndRange = rndRange; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java deleted file mode 100644 index 432ef47..0000000 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.springframework.beans.factory.xml.*; -import org.springframework.context.support.*; -import org.springframework.core.io.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Streamer benchmark. - */ -public class GridStreamerBenchmark { - - /** - * Entry point. Expects configuration URL to be provided. - * - * @param args Arguments. First argument is grid configuration. Second optional argument "-w" - stands for - * "worker", in this case no load will be generated on the node. - * @throws Exception In case of any error. - */ - public static void main(String[] args) throws Exception{ - if (args.length == 0) - throw new IllegalArgumentException("Configuration path is not provided."); - - String cfgPath = args.length > 0 ? args[0] : - "modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml"; - - boolean worker = args.length > 1 && "-w".equalsIgnoreCase(args[1]); - - // Get load definitions. - Collection<GridStreamerLoad> loads = worker ? null : loads(cfgPath); - - // Start the grid. - Ignite ignite = G.start(cfgPath); - - // Start load threads. - Collection<Thread> loadThreads = new HashSet<>(); - - if (loads != null && !loads.isEmpty()) { - for (GridStreamerLoad load : loads) { - final IgniteStreamer streamer = ignite.streamer(load.getName()); - - if (streamer == null) - throw new Exception("Steamer is not found: " + load.getName()); - - List<IgniteInClosure<IgniteStreamer>> clos = load.getClosures(); - - if (clos != null && !clos.isEmpty()) { - for (final IgniteInClosure<IgniteStreamer> clo : clos) { - Thread t = new Thread(new Runnable() { - @Override public void run() { - try { - clo.apply(streamer); - } - catch (Exception e) { - X.println("Exception during execution of closure for streamer " + - "[streamer=" + streamer.name() + ", closure=" + clo + ", err=" + - e.getMessage() + ']'); - - e.printStackTrace(); - } - } - }); - - loadThreads.add(t); - - t.start(); - } - } - } - } - - // Once all loads are started, simply join them. - System.out.println("Press enter to stop running benchmark."); - - try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { - in.readLine(); - } - - for (Thread t : loadThreads) - t.interrupt(); - - for (Thread t : loadThreads) - t.join(); - } - - /** - * Get loads from the Spring context. - * - * @param cfgPath Configuration path. - * @return Collection of loads, if any. - * @throws Exception If failed. - */ - private static Collection<GridStreamerLoad> loads(String cfgPath) throws Exception { - URL cfgUrl; - - try { - cfgUrl = new URL(cfgPath); - } - catch (MalformedURLException ignore) { - cfgUrl = U.resolveIgniteUrl(cfgPath); - } - - if (cfgUrl == null) - throw new Exception("Spring XML configuration path is invalid: " + cfgPath); - - GenericApplicationContext springCtx = new GenericApplicationContext(); - - new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); - - springCtx.refresh(); - - Map<String, GridStreamerLoad> cfgMap = springCtx.getBeansOfType(GridStreamerLoad.class); - - return cfgMap.values(); - } -}