http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java deleted file mode 100644 index 9132972..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java +++ /dev/null @@ -1,804 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * Window that is bounded by size and accumulates events to batches. - */ -public class StreamerBoundedSizeBatchWindow<E> extends StreamerWindowAdapter<E> { - /** Max size. */ - private int batchSize; - - /** Min size. */ - private int maxBatches; - - /** Reference for queue and size. */ - private volatile QueueHolder holder; - - /** Enqueue lock. */ - private ReadWriteLock enqueueLock = new ReentrantReadWriteLock(); - - /** - * Gets maximum number of batches can be stored in window. - * - * @return Maximum number of batches for window. - */ - public int getMaximumBatches() { - return maxBatches; - } - - /** - * Sets maximum number of batches can be stored in window. - * - * @param maxBatches Maximum number of batches for window. - */ - public void setMaximumBatches(int maxBatches) { - this.maxBatches = maxBatches; - } - - /** - * Gets batch size. - * - * @return Batch size. - */ - public int getBatchSize() { - return batchSize; - } - - /** - * Sets batch size. - * - * @param batchSize Batch size. - */ - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - /** {@inheritDoc} */ - @Override public void checkConfiguration() { - if (batchSize <= 0) - throw new IgniteException("Failed to initialize window (batchSize size must be positive) " + - "[windowClass=" + getClass().getSimpleName() + - ", maximumBatches=" + maxBatches + - ", batchSize=" + batchSize + ']'); - - if (maxBatches < 0) - throw new IgniteException("Failed to initialize window (maximumBatches cannot be negative) " + - "[windowClass=" + getClass().getSimpleName() + - ", maximumBatches=" + maxBatches + - ", batchSize=" + batchSize + ']'); - } - - /** {@inheritDoc} */ - @Override protected void stop0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void reset0() { - ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>(); - - Batch b = new Batch(batchSize); - - ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b); - - b.node(n); - - holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger()); - } - - /** {@inheritDoc} */ - @Override public int size() { - return holder.totalQueueSize().get(); - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iterator0() { - final QueueHolder win = holder; - - final Iterator<Batch> batchIt = win.batchQueue().iterator(); - - return new GridStreamerWindowIterator<E>() { - /** Current batch iterator. */ - private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt; - - /** Next batch iterator. Will be null if no more batches available. */ - private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt; - - /** Last returned value. */ - private E lastRet; - - { - curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean hasNext() { - if (curBatchIt != null) { - if (curBatchIt.hasNext()) - return true; - - return nextBatchIt != null && nextBatchIt.hasNext(); - } - else - return false; - } - - /** {@inheritDoc} */ - @Override public E next() { - if (curBatchIt == null) - throw new NoSuchElementException(); - - if (!curBatchIt.hasNext()) { - if (nextBatchIt != null) { - curBatchIt = nextBatchIt; - - nextBatchIt = null; - - lastRet = curBatchIt.next(); - } - else - throw new NoSuchElementException(); - } - else { - E next = curBatchIt.next(); - - // Moved to last element in batch - check for next iterator. - if (!curBatchIt.hasNext()) - advanceBatch(); - - lastRet = next; - } - - return lastRet; - } - - /** {@inheritDoc} */ - @Nullable @Override public E removex() { - if (curBatchIt == null) - throw new NoSuchElementException(); - - if (curBatchIt.removex()) { - // Decrement global size if deleted. - win.totalQueueSize().decrementAndGet(); - - return lastRet; - } - else - return null; - } - - /** - * Moves to the next batch. - */ - private void advanceBatch() { - if (batchIt.hasNext()) { - Batch batch = batchIt.next(); - - nextBatchIt = batch.iterator(); - } - else - nextBatchIt = null; - } - }; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - QueueHolder win = holder; - - int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches); - - Iterator<Batch> it = win.batchQueue().iterator(); - - int size = 0; - - int idx = 0; - - while (it.hasNext()) { - Batch batch = it.next(); - - if (idx++ < oversizeCnt) - size += batch.size(); - } - - return size; - } - - /** {@inheritDoc} */ - @Override protected boolean enqueue0(E evt) { - try { - return enqueueInternal(evt); - } - catch (IgniteInterruptedCheckedException ignored) { - return false; - } - } - - /** - * Enqueue event to window. - * - * @param evt Event to add. - * @return {@code True} if event was added. - * - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread was interrupted. - */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - private boolean enqueueInternal(E evt) throws IgniteInterruptedCheckedException { - QueueHolder tup = holder; - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - while (true) { - Batch last = evts.peekLast(); - - if (last == null || !last.add(evt)) { - // This call will ensure that last object is actually added to batch - // before we add new batch to events queue. - // If exception is thrown here, window will be left in consistent state. - if (last != null) - last.finish(); - - // Add new batch to queue in write lock. - if (enqueueLock.writeLock().tryLock()) { - try { - Batch first0 = evts.peekLast(); - - if (first0 == last) { - Batch batch = new Batch(batchSize); - - ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch); - - batch.node(node); - - size.incrementAndGet(); - - if (batch.removed() && evts.unlinkx(node)) - size.decrementAndGet(); - } - } - finally { - enqueueLock.writeLock().unlock(); - } - } - else { - // Acquire read lock to wait for batch enqueue. - enqueueLock.readLock().lock(); - - enqueueLock.readLock().unlock(); - } - } - else { - // Event was added, global size increment. - tup.totalQueueSize().incrementAndGet(); - - return true; - } - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvicted0(int cnt) { - QueueHolder tup = holder; - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - Collection<E> res = new ArrayList<>(cnt); - - while (true) { - int curSize = size.get(); - - if (curSize > maxBatches) { - // Just peek the first batch. - Batch first = evts.peekFirst(); - - if (first != null) { - assert first.finished(); - - Collection<E> polled = first.pollNonBatch(cnt - res.size()); - - if (!polled.isEmpty()) - res.addAll(polled); - - if (first.isEmpty()) { - ConcurrentLinkedDeque8.Node<Batch> node = first.node(); - - first.markRemoved(); - - if (node != null && evts.unlinkx(node)) - size.decrementAndGet(); - } - - if (res.size() == cnt) - break; - } - else - break; - } - else - break; - } - - // Removed entries, update global size. - tup.totalQueueSize().addAndGet(-res.size()); - - return res; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvictedBatch0() { - QueueHolder tup = holder; - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - while (true) { - int curSize = size.get(); - - if (curSize > maxBatches) { - if (size.compareAndSet(curSize, curSize - 1)) { - Batch polled = evts.poll(); - - if (polled != null) { - assert polled.finished(); - - // Mark batch deleted for consistency. - polled.markRemoved(); - - Collection<E> polled0 = polled.shrink(); - - // Result of shrink is empty, must retry the poll. - if (!polled0.isEmpty()) { - // Update global size. - tup.totalQueueSize().addAndGet(-polled0.size()); - - return polled0; - } - } - else { - // Polled was zero, so we must restore counter and return. - size.incrementAndGet(); - - return Collections.emptyList(); - } - } - } - else - return Collections.emptyList(); - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> dequeue0(int cnt) { - QueueHolder tup = holder; - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - Collection<E> res = new ArrayList<>(cnt); - - while (true) { - // Just peek the first batch. - Batch first = evts.peekFirst(); - - if (first != null) { - Collection<E> polled = first.pollNonBatch(cnt - res.size()); - - // We must check for finished before unlink as no elements - // can be added to batch after it is finished. - if (first.isEmpty() && first.emptyFinished()) { - ConcurrentLinkedDeque8.Node<Batch> node = first.node(); - - first.markRemoved(); - - if (node != null && evts.unlinkx(node)) - size.decrementAndGet(); - - assert first.isEmpty(); - } - else if (polled.isEmpty()) - break; - - res.addAll(polled); - - if (res.size() == cnt) - break; - } - else - break; - } - - // Update global size. - tup.totalQueueSize().addAndGet(-res.size()); - - return res; - } - - /** - * Consistency check, used for testing. - */ - void consistencyCheck() { - QueueHolder win = holder; - - Iterator<E> it = iterator(); - - int cnt = 0; - - while (it.hasNext()) { - it.next(); - - cnt++; - } - - int cnt0 = 0; - - for (Batch batch : win.batchQueue()) - cnt0 += batch.size(); - - int sz = size(); - - assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']'; - assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']'; - assert win.batchQueue().size() == win.batchQueueSize().get(); - } - - /** - * Window structure. - */ - private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public QueueHolder() { - // No-op. - } - - /** - * @param batchQueue Batch queue. - * @param batchQueueSize Batch queue size counter. - * @param globalSize Global size counter. - */ - private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue, - AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) { - super(batchQueue, batchQueueSize, globalSize); - - assert batchQueue.size() == 1; - assert batchQueueSize.get() == 1; - } - - /** - * @return Events queue. - */ - @SuppressWarnings("ConstantConditions") - public ConcurrentLinkedDeque8<Batch> batchQueue() { - return get1(); - } - - /** - * @return Batch queue size. - */ - @SuppressWarnings("ConstantConditions") - public AtomicInteger batchQueueSize() { - return get2(); - } - - /** - * @return Global queue size. - */ - @SuppressWarnings("ConstantConditions") - public AtomicInteger totalQueueSize() { - return get3(); - } - } - - /** - * Batch. - */ - private class Batch extends ReentrantReadWriteLock implements Iterable<E> { - /** */ - private static final long serialVersionUID = 0L; - - /** Batch events. */ - private ConcurrentLinkedDeque8<E> evts; - - /** Capacity. */ - private AtomicInteger cap; - - /** Finished. */ - private volatile boolean finished; - - /** Queue node. */ - @GridToStringExclude - private ConcurrentLinkedDeque8.Node<Batch> qNode; - - /** Node removed flag. */ - private volatile boolean rmvd; - - /** - * @param batchSize Batch size. - */ - private Batch(int batchSize) { - cap = new AtomicInteger(batchSize); - - evts = new ConcurrentLinkedDeque8<>(); - } - - /** - * @return {@code True} if batch is removed. - */ - public boolean removed() { - return rmvd; - } - - /** - * Marks batch as removed. - */ - public void markRemoved() { - rmvd = true; - } - - /** - * Adds event to batch. - * - * @param evt Event to add. - * @return {@code True} if event was added, {@code false} if batch is full. - */ - public boolean add(E evt) { - readLock().lock(); - - try { - if (finished) - return false; - - while (true) { - int size = cap.get(); - - if (size > 0) { - if (cap.compareAndSet(size, size - 1)) { - evts.add(evt); - - // Will go through write lock and finish batch. - if (size == 1) - finished = true; - - return true; - } - } - else - return false; - } - } - finally { - readLock().unlock(); - } - } - - /** - * @return Queue node. - */ - public ConcurrentLinkedDeque8.Node<Batch> node() { - return qNode; - } - - /** - * @param qNode Queue node. - */ - public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) { - this.qNode = qNode; - } - - /** - * Waits for latch count down after last event was added. - * - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If wait was interrupted. - */ - public void finish() throws IgniteInterruptedCheckedException { - writeLock().lock(); - - try { - // Safety. - assert cap.get() == 0; - assert finished; - } - finally { - writeLock().unlock(); - } - } - - /** - * @return {@code True} if batch is finished and no more events will be added to it. - */ - public boolean finished() { - readLock().lock(); - - try { - return finished; - } - finally { - readLock().unlock(); - } - } - - /** - * Gets batch size. - * - * @return Batch size. - */ - public int size() { - readLock().lock(); - - try { - return evts == null ? 0 : evts.sizex(); - } - finally { - readLock().unlock(); - } - } - - /** - * @return {@code True} if batch is empty. - */ - public boolean isEmpty() { - readLock().lock(); - - try { - return evts == null || evts.isEmpty(); - } - finally { - readLock().unlock(); - } - } - - /** - * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will - * be added to batch and it can be safely unlinked from the queue. - * - * @return {@code True} if batch is empty and finished. - */ - public boolean emptyFinished() { - writeLock().lock(); - - try { - return finished && (evts == null || evts.isEmpty()); - } - finally { - writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() { - readLock().lock(); - - try { - if (evts != null) - return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator(); - - return new ConcurrentLinkedDeque8.IteratorEx<E>() { - @Override public boolean removex() { - throw new NoSuchElementException(); - } - - @Override public boolean hasNext() { - return false; - } - - @Override public E next() { - throw new NoSuchElementException(); - } - - @Override public void remove() { - throw new NoSuchElementException(); - } - }; - } - finally { - readLock().unlock(); - } - } - - /** - * Polls up to {@code cnt} objects from batch in concurrent fashion. - * - * @param cnt Number of objects to poll. - * @return Collection of polled elements or empty collection if nothing to poll. - */ - public Collection<E> pollNonBatch(int cnt) { - readLock().lock(); - - try { - if (evts == null) - return Collections.emptyList(); - - Collection<E> res = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - E evt = evts.poll(); - - if (evt != null) - res.add(evt); - else - return res; - } - - return res; - } - finally { - readLock().unlock(); - } - } - - /** - * Shrinks this batch. No events can be polled from it after this method. - * - * @return Collection of events contained in batch before shrink (empty collection in - * case no events were present). - */ - public Collection<E> shrink() { - writeLock().lock(); - - try { - if (evts == null) - return Collections.emptyList(); - - // Since iterator can concurrently delete elements, we must poll here. - Collection<E> res = new ArrayList<>(evts.sizex()); - - E o; - - while ((o = evts.poll()) != null) - res.add(o); - - // Nothing cal be polled after shrink. - evts = null; - - return res; - } - finally { - writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - ConcurrentLinkedDeque8<E> evts0 = evts; - - return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java deleted file mode 100644 index f2bfcf3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Size-bounded sorted window. Unlike {@link StreamerBoundedSizeWindow}, which limits - * window only on size, this window also provides events in sorted order. - */ -public class StreamerBoundedSizeSortedWindow<E> - extends StreamerBoundedSizeWindowAdapter<E, StreamerBoundedSizeSortedWindow.Holder<E>> { - /** Comparator. */ - private Comparator<E> comp; - - /** Order counter. */ - private AtomicLong orderCnt = new AtomicLong(); - - /** - * Gets event comparator. - * - * @return Event comparator. - */ - public Comparator<E> getComparator() { - return comp; - } - - /** - * Sets event comparator. - * - * @param comp Comparator. - */ - public void setComparator(Comparator<E> comp) { - this.comp = comp; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected Collection<Holder<E>> newCollection() { - final Comparator<E> comp0 = comp; - - Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() { - @Override public int compare(Holder<E> h1, Holder<E> h2) { - if (h1 == h2) - return 0; - - int diff = comp0 == null ? - ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val); - - if (diff != 0) - return diff; - else { - assert h1.order != h2.order; - - return h1.order < h2.order ? -1 : 1; - } - } - }); - - return (Collection)col; - } - - /** {@inheritDoc} */ - @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) { - if (comp == null) { - if (!(evt instanceof Comparable)) - throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " + - "comparator is specified: " + evt); - } - - if (set != null) { - if (set.add(evt)) { - col.add(new Holder<>(evt, orderCnt.getAndIncrement())); - - return true; - } - - return false; - } - else { - col.add(new Holder<>(evt, orderCnt.getAndIncrement())); - - return true; - } - } - - /** {@inheritDoc} */ - @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) { - int cnt = 0; - - for (E evt : evts) { - if (addInternal(evt, col, set)) - cnt++; - } - - return cnt; - } - - /** {@inheritDoc} */ - @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) { - Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast(); - - if (set != null && h != null) - set.remove(h.val); - - return h == null ? null : h.val; - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col, - final Set<E> set, final AtomicInteger size) { - final Iterator<Holder<E>> it = col.iterator(); - - return new GridStreamerWindowIterator<E>() { - private Holder<E> lastRet; - - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public E next() { - lastRet = it.next(); - - return lastRet.val; - } - - @Override public E removex() { - if (lastRet == null) - throw new IllegalStateException(); - - if (col.remove(lastRet)) { - if (set != null) - set.remove(lastRet.val); - - size.decrementAndGet(); - - return lastRet.val; - } - else - return null; - } - }; - } - - /** - * Value wrapper. - */ - @SuppressWarnings("PackageVisibleInnerClass") - static class Holder<E> { - /** Value. */ - private E val; - - /** Order to distinguish between objects for which comparator returns 0. */ - private long order; - - /** - * @param val Value to hold. - * @param order Adding order. - */ - private Holder(E val, long order) { - this.val = val; - this.order = order; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (this == obj) - return false; - - if (!(obj instanceof Holder)) - return false; - - Holder h = (Holder)obj; - - return F.eq(val, h.val) && order == h.order; - } - } - - /** {@inheritDoc} */ - @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) { - assert col.size() == size.get(); - - if (set != null) { - // Check no duplicates in collection. - - Collection<Object> vals = new HashSet<>(); - - for (Object evt : col) - assert vals.add(((Holder)evt).val); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java deleted file mode 100644 index 16033eb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.internal.processors.streamer.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Queue window bounded by number of elements in queue. After adding elements to this window called - * must check for evicted events. - * <p> - * It is guaranteed that window size will never get less then maximum size when poling from this window - * concurrently from different threads. - */ -public class StreamerBoundedSizeWindow<E> extends StreamerBoundedSizeWindowAdapter<E, E> { - /** {@inheritDoc} */ - @Override protected Collection<E> newCollection() { - return new ConcurrentLinkedDeque8<>(); - } - - /** {@inheritDoc} */ - @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set, - final AtomicInteger size) { - final ConcurrentLinkedDeque8.IteratorEx<E> it = - (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator(); - - return new GridStreamerWindowIterator<E>() { - private E lastRet; - - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public E next() { - lastRet = it.next(); - - return lastRet; - } - - @Override public E removex() { - if (it.removex()) { - if (set != null) - set.remove(lastRet); - - size.decrementAndGet(); - - return lastRet; - } - else - return null; - } - }; - } - - /** {@inheritDoc} */ - @SuppressWarnings("IfMayBeConditional") - @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) { - assert col instanceof ConcurrentLinkedDeque8; - - // If unique. - if (set != null) { - if (set.add(evt)) { - col.add(evt); - - return true; - } - - return false; - } - else { - col.add(evt); - - return true; - } - } - - /** {@inheritDoc} */ - @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) { - assert col instanceof ConcurrentLinkedDeque8; - if (set != null) { - int cnt = 0; - - for (E evt : evts) { - if (set.add(evt)) { - col.add(evt); - - cnt++; - } - } - - return cnt; - } - else { - col.addAll(evts); - - return evts.size(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) { - assert col instanceof ConcurrentLinkedDeque8; - - E res = ((Queue<E>)col).poll(); - - if (set != null && res != null) - set.remove(res); - - return res; - } - - /** {@inheritDoc} */ - @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) { - assert col.size() == size.get(); - - if (set != null) { - // Check no duplicates in collection. - - Collection<Object> vals = new HashSet<>(); - - for (Object evt : col) - assert vals.add(evt); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java deleted file mode 100644 index ff54ad0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Abstract non-public class for size-bound windows. Support reset. - */ -abstract class StreamerBoundedSizeWindowAdapter<E, T> extends StreamerWindowAdapter<E> { - /** Reference. */ - private AtomicReference<WindowHolder> ref = new AtomicReference<>(); - - /** If true, only unique elements will be accepted. */ - private boolean unique; - - /** Window maximum size. */ - protected int maxSize; - - /** - * Gets window maximum size. - * - * @return Maximum size. - */ - public int getMaximumSize() { - return maxSize; - } - - /** - * Sets window maximum size. - * - * @param maxSize Maximum size. - */ - public void setMaximumSize(int maxSize) { - this.maxSize = maxSize; - } - - /** - * @return True if only unique elements will be accepted. - */ - public boolean isUnique() { - return unique; - } - - /** - * @param unique If true, only unique elements will be accepted. - */ - public void setUnique(boolean unique) { - this.unique = unique; - } - - /** {@inheritDoc} */ - @Override public void checkConfiguration() { - if (maxSize < 0) - throw new IgniteException("Failed to initialize window (maximumSize cannot be negative) " + - "[windowClass=" + getClass().getSimpleName() + - ", maxSize=" + maxSize + - ", unique=" + unique + ']'); - } - - /** {@inheritDoc} */ - @Override protected void stop0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int size() { - int size = ref.get().size().get(); - - return size > 0 ? size : 0; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - int evictSize = size() - maxSize; - - return evictSize > 0 ? evictSize : 0; - } - - /** {@inheritDoc} */ - @Override protected boolean enqueue0(E evt) { - add(evt); - - return true; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvicted0(int cnt) { - Collection<E> res = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - E evicted = pollEvictedInternal(); - - if (evicted == null) - return res; - - res.add(evicted); - } - - return res; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvictedBatch0() { - E res = pollEvictedInternal(); - - if (res == null) - return Collections.emptyList(); - - return Collections.singleton(res); - } - - /** - * Poll evicted internal implementation. - * - * @return Evicted element. - */ - @Nullable private E pollEvictedInternal() { - WindowHolder tup = ref.get(); - - AtomicInteger size = tup.size(); - - while (true) { - int curSize = size.get(); - - if (curSize > maxSize) { - if (size.compareAndSet(curSize, curSize - 1)) { - E evt = pollInternal(tup.collection(), tup.set()); - - if (evt != null) - return evt; - else { - // No actual events in queue, it means that other thread is just adding event. - // return null as it is a concurrent add call. - size.incrementAndGet(); - - return null; - } - } - } - else - return null; - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> dequeue0(int cnt) { - WindowHolder tup = ref.get(); - - AtomicInteger size = tup.size(); - Collection<T> evts = tup.collection(); - - Collection<E> resCol = new ArrayList<>(cnt); - - while (true) { - int curSize = size.get(); - - if (curSize > 0) { - if (size.compareAndSet(curSize, curSize - 1)) { - E res = pollInternal(evts, tup.set()); - - if (res != null) { - resCol.add(res); - - if (resCol.size() >= cnt) - return resCol; - } - else { - size.incrementAndGet(); - - return resCol; - } - } - } - else - return resCol; - } - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iterator0() { - WindowHolder win = ref.get(); - - return iteratorInternal(win.collection(), win.set(), win.size()); - } - - /** {@inheritDoc} */ - @Override protected void reset0() { - ref.set(new WindowHolder(newCollection(), - unique ? new GridConcurrentHashSet<E>() : null, - new AtomicInteger())); - } - - /** - * @param evt Event to add. - */ - private void add(E evt) { - WindowHolder tup = ref.get(); - - if (addInternal(evt, tup.collection(), tup.set())) - tup.size().incrementAndGet(); - } - - /** - * @param evts Events to add. - */ - private void addAll(Collection<E> evts) { - WindowHolder tup = ref.get(); - - int cnt = addAllInternal(evts, tup.collection(), tup.set()); - - tup.size().addAndGet(cnt); - } - - /** - * Checks window consistency. Used for testing. - */ - void consistencyCheck() { - WindowHolder win = ref.get(); - - consistencyCheck(win.collection(), win.set(), win.size()); - } - - /** - * Get underlying collection. - * - * @return Collection. - */ - @SuppressWarnings("ConstantConditions") - protected Collection<T> collection() { - return ref.get().get1(); - } - - /** - * Creates new collection specific for window implementation. This collection will be subsequently passed - * to addInternal(...) and pollInternal() methods. - * - * @return Collection - holder. - */ - protected abstract Collection<T> newCollection(); - - /** - * Adds event to queue implementation. - * - * @param evt Event to add. - * @param col Collection to add to. - * @param set Set to check. - * @return {@code True} if event was added. - */ - protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set); - - /** - * Adds all events to queue implementation. - * - * @param evts Events to add. - * @param col Collection to add to. - * @param set Set to check. - * @return Added events number. - */ - protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set); - - /** - * @param col Collection to add to. - * @param set Set to check. - * @return Polled object. - */ - @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set); - - /** - * Creates iterator based on implementation collection type. - * - * @param col Collection. - * @param set Set to check. - * @param size Size. - * @return Iterator. - */ - protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set, - AtomicInteger size); - - /** - * Checks consistency. Used in tests. - * - * @param col Collection. - * @param set Set if unique. - * @param size Size holder. - */ - protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size); - - /** - * Window holder. - */ - @SuppressWarnings("ConstantConditions") - private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public WindowHolder() { - // No-op. - } - - /** - * @param col Collection. - * @param set Set if unique. - * @param size Window size counter. - */ - WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) { - super(col, set, size); - } - - /** - * @return Collection. - */ - public Collection<T> collection() { - return get1(); - } - - /** - * @return Set. - */ - public Set<E> set() { - return get2(); - } - - /** - * @return Size. - */ - public AtomicInteger size() { - return get3(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java deleted file mode 100644 index a2aae67..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java +++ /dev/null @@ -1,906 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * Window that accumulates events in batches, and is bounded by time and maximum number of batches. - */ -public class StreamerBoundedTimeBatchWindow<E> extends StreamerWindowAdapter<E> { - /** Batch size. */ - private int batchSize; - - /** Maximum batches. */ - private int maxBatches; - - /** */ - private long batchTimeInterval; - - /** Atomic reference for queue and size. */ - private AtomicReference<WindowHolder> ref = new AtomicReference<>(); - - /** Enqueue lock. */ - private ReadWriteLock enqueueLock = new ReentrantReadWriteLock(); - - /** - * Gets maximum number of batches can be stored in window. - * - * @return Maximum number of batches for window. - */ - public int getMaximumBatches() { - return maxBatches; - } - - /** - * Sets maximum number of batches can be stored in window. - * - * @param maxBatches Maximum number of batches for window. - */ - public void setMaximumBatches(int maxBatches) { - this.maxBatches = maxBatches; - } - - /** - * Gets batch size. - * - * @return Batch size. - */ - public int getBatchSize() { - return batchSize; - } - - /** - * Sets batch size. - * - * @param batchSize Batch size. - */ - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - /** - * Gets batch time interval. - * - * @return Batch time interval. - */ - public long getBatchTimeInterval() { - return batchTimeInterval; - } - - /** - * Sets batch time interval. - * - * @param batchTimeInterval Batch time interval. - */ - public void setBatchTimeInterval(long batchTimeInterval) { - this.batchTimeInterval = batchTimeInterval; - } - - /** {@inheritDoc} */ - @Override public void checkConfiguration() { - if (maxBatches < 0) - throw new IgniteException("Failed to initialize window (maximumBatches cannot be negative) " + - "[windowClass=" + getClass().getSimpleName() + - ", maximumBatches=" + maxBatches + - ", batchSize=" + batchSize + - ", batchTimeInterval=" + batchTimeInterval + ']'); - - if (batchSize < 0) - throw new IgniteException("Failed to initialize window (batchSize cannot be negative) " + - "[windowClass=" + getClass().getSimpleName() + - ", maximumBatches=" + maxBatches + - ", batchSize=" + batchSize + - ", batchTimeInterval=" + batchTimeInterval + ']'); - else if (batchSize == 0) - batchSize = Integer.MAX_VALUE; - - if (batchTimeInterval <= 0) - throw new IgniteException("Failed to initialize window (batchTimeInterval must be positive) " + - "[windowClass=" + getClass().getSimpleName() + - ", maximumBatches=" + maxBatches + - ", batchSize=" + batchSize + - ", batchTimeInterval=" + batchTimeInterval + ']'); - } - - /** {@inheritDoc} */ - @Override protected void stop0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void reset0() { - ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>(); - - Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval); - - ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b); - - b.node(n); - - ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger())); - } - - /** {@inheritDoc} */ - @Override public int size() { - return ref.get().totalQueueSize().get(); - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iterator0() { - final WindowHolder win = ref.get(); - - final Iterator<Batch> batchIt = win.batchQueue().iterator(); - - return new GridStreamerWindowIterator<E>() { - /** Current batch iterator. */ - private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt; - - /** Next batch iterator. Will be null if no more batches available. */ - private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt; - - /** Last returned value. */ - private E lastRet; - - { - curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean hasNext() { - if (curBatchIt != null) { - if (curBatchIt.hasNext()) - return true; - - return nextBatchIt != null && nextBatchIt.hasNext(); - } - else - return false; - } - - /** {@inheritDoc} */ - @Override public E next() { - if (curBatchIt == null) - throw new NoSuchElementException(); - - if (!curBatchIt.hasNext()) { - if (nextBatchIt != null) { - curBatchIt = nextBatchIt; - - nextBatchIt = null; - - lastRet = curBatchIt.next(); - } - else - throw new NoSuchElementException(); - } - else { - E next = curBatchIt.next(); - - // Moved to last element in batch - check for next iterator. - if (!curBatchIt.hasNext()) - advanceBatch(); - - lastRet = next; - } - - return lastRet; - } - - /** {@inheritDoc} */ - @Override public E removex() { - if (curBatchIt == null) - throw new NoSuchElementException(); - - if (curBatchIt.removex()) { - // Decrement global size if deleted. - win.totalQueueSize().decrementAndGet(); - - return lastRet; - } - else - return null; - } - - /** - * Moves to the next batch. - */ - private void advanceBatch() { - if (batchIt.hasNext()) { - Batch batch = batchIt.next(); - - nextBatchIt = batch.iterator(); - } - else - nextBatchIt = null; - } - }; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - WindowHolder win = ref.get(); - - int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0; - - long now = U.currentTimeMillis(); - - Iterator<Batch> it = win.batchQueue().iterator(); - - int size = 0; - - int idx = 0; - - while (it.hasNext()) { - Batch batch = it.next(); - - if (idx++ < oversizeCnt || batch.batchEndTs < now) - size += batch.size(); - } - - return size; - } - - /** {@inheritDoc} */ - @Override protected boolean enqueue0(E evt) { - try { - return enqueue0(evt, U.currentTimeMillis()); - } - catch (IgniteInterruptedCheckedException ignored) { - return false; - } - } - - /** - * Enqueue event to window. - * - * @param evt Event to add. - * @param ts Event timestamp. - * @return {@code True} if event was added. - * - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread was interrupted. - */ - private boolean enqueue0(E evt, long ts) throws IgniteInterruptedCheckedException { - WindowHolder tup = ref.get(); - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - while (true) { - Batch last = evts.peekLast(); - - if (last == null || !last.add(evt, ts)) { - // This call will ensure that last object is actually added to batch - // before we add new batch to events queue. - // If exception is thrown here, window will be left in consistent state. - if (last != null) - last.finish(); - - // Add new batch to queue in write lock. - if (enqueueLock.writeLock().tryLock()) { - try { - Batch first0 = evts.peekLast(); - - if (first0 == last) { - Batch batch = new Batch(batchSize, ts + batchTimeInterval); - - ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch); - - batch.node(node); - - size.incrementAndGet(); - - // If batch was removed in other thread. - if (batch.removed() && evts.unlinkx(node)) - size.decrementAndGet(); - } - } - finally { - enqueueLock.writeLock().unlock(); - } - } - else { - // Acquire read lock to wait for batch enqueue. - enqueueLock.readLock().lock(); - - try { - evts.peekLast(); - } - finally { - enqueueLock.readLock().unlock(); - } - } - } - else { - // Event was added, global size increment. - tup.totalQueueSize().incrementAndGet(); - - return true; - } - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvicted0(int cnt) { - WindowHolder tup = ref.get(); - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - Collection<E> res = new ArrayList<>(cnt); - - while (true) { - int curSize = size.get(); - - // Just peek the first batch. - Batch first = evts.peekFirst(); - - if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) { - assert first.finished(); - - Collection<E> polled = first.pollNonBatch(cnt - res.size()); - - if (!polled.isEmpty()) - res.addAll(polled); - - if (first.isEmpty()) { - ConcurrentLinkedDeque8.Node<Batch> node = first.node(); - - first.markRemoved(); - - if (node != null && evts.unlinkx(node)) - size.decrementAndGet(); - } - - if (res.size() == cnt) - break; - } - else - break; - } - - // Removed entries, update global size. - tup.totalQueueSize().addAndGet(-res.size()); - - return res; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvictedBatch0() { - WindowHolder tup = ref.get(); - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - while (true) { - int curSize = size.get(); - - if (maxBatches > 0 && curSize > maxBatches) { - if (size.compareAndSet(curSize, curSize - 1)) { - Batch polled = evts.poll(); - - if (polled != null) { - assert polled.finished(); - - // Mark batch removed for consistency. - polled.markRemoved(); - - Collection<E> polled0 = polled.shrink(); - - // Result of shrink is empty, must retry the poll. - if (!polled0.isEmpty()) { - // Update global size. - tup.totalQueueSize().addAndGet(-polled0.size()); - - return polled0; - } - } - else { - // Polled was zero, so we must restore counter and return. - size.incrementAndGet(); - - return Collections.emptyList(); - } - } - } - else { - while (true) { - Batch batch = evts.peekFirst(); - - // This call will finish batch and return true if batch is expired. - if (batch != null && batch.checkExpired()) { - assert batch.finished(); - - ConcurrentLinkedDeque8.Node<Batch> node = batch.node(); - - batch.markRemoved(); - - if (node != null && evts.unlinkx(node)) - size.decrementAndGet(); - - Collection<E> col = batch.shrink(); - - tup.totalQueueSize().addAndGet(-col.size()); - - if (!col.isEmpty()) - return col; - } - else - return Collections.emptyList(); - } - } - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> dequeue0(int cnt) { - WindowHolder tup = ref.get(); - - ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue(); - AtomicInteger size = tup.batchQueueSize(); - - Collection<E> res = new ArrayList<>(cnt); - - while (true) { - // Just peek the first batch. - Batch first = evts.peekFirst(); - - if (first != null) { - Collection<E> polled = first.pollNonBatch(cnt - res.size()); - - // We must check for finished before unlink as no elements - // can be added to batch after it is finished. - if (first.isEmpty() && first.emptyFinished()) { - ConcurrentLinkedDeque8.Node<Batch> node = first.node(); - - first.markRemoved(); - - if (node != null && evts.unlinkx(node)) - size.decrementAndGet(); - - assert first.isEmpty(); - } - else if (polled.isEmpty()) - break; - - res.addAll(polled); - - if (res.size() == cnt) - break; - } - else - break; - } - - // Update global size. - tup.totalQueueSize().addAndGet(-res.size()); - - return res; - } - - /** - * Consistency check, used for testing. - */ - void consistencyCheck() { - WindowHolder win = ref.get(); - - Iterator<E> it = iterator(); - - int cnt = 0; - - while (it.hasNext()) { - it.next(); - - cnt++; - } - - int cnt0 = 0; - - for (Batch batch : win.batchQueue()) - cnt0 += batch.size(); - - int sz = size(); - - assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']'; - assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']'; - assert win.batchQueue().size() == win.batchQueueSize().get(); - } - - /** - * Window structure. - */ - @SuppressWarnings("ConstantConditions") - private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public WindowHolder() { - // No-op. - } - - /** - * @param batchQueue Batch queue. - * @param batchQueueSize Batch queue size counter. - * @param globalSize Global size counter. - */ - private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue, - AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) { - super(batchQueue, batchQueueSize, globalSize); - - assert batchQueue.size() == 1; - assert batchQueueSize.get() == 1; - } - - /** - * @return Events queue. - */ - public ConcurrentLinkedDeque8<Batch> batchQueue() { - return get1(); - } - - /** - * @return Batch queue size. - */ - public AtomicInteger batchQueueSize() { - return get2(); - } - - /** - * @return Global queue size. - */ - public AtomicInteger totalQueueSize() { - return get3(); - } - } - - /** - * Batch. - */ - private class Batch extends ReentrantReadWriteLock implements Iterable<E> { - /** */ - private static final long serialVersionUID = 0L; - - /** Batch events. */ - private ConcurrentLinkedDeque8<E> evts; - - /** Capacity. */ - private AtomicInteger cap; - - /** Batch end timestamp. */ - private final long batchEndTs; - - /** Finished flag. */ - private boolean finished; - - /** Queue node. */ - @GridToStringExclude - private ConcurrentLinkedDeque8.Node<Batch> qNode; - - /** Removed flag. */ - private volatile boolean rmvd; - - /** - * @param batchSize Batch size. - * @param batchEndTs Batch end timestamp. - */ - private Batch(int batchSize, long batchEndTs) { - cap = new AtomicInteger(batchSize); - this.batchEndTs = batchEndTs; - - evts = new ConcurrentLinkedDeque8<>(); - } - - /** - * @return {@code True} if removed. - */ - public boolean removed() { - return rmvd; - } - - /** - * Marks batch as removed. - */ - public void markRemoved() { - rmvd = true; - } - - /** - * Adds event to batch. - * - * @param evt Event to add. - * @param ts Event timestamp. - * @return {@code True} if event was added, {@code false} if batch is full. - */ - public boolean add(E evt, long ts) { - if (ts <= batchEndTs) { - readLock().lock(); - - try { - if (finished) - // Finished was set inside write lock. - return false; - - while (true) { - int size = cap.get(); - - if (size > 0) { - if (cap.compareAndSet(size, size - 1)) { - evts.add(evt); - - // Will go through write lock and finish batch. - if (size == 1) - finished = true; - - return true; - } - } - else - return false; - } - } - finally { - readLock().unlock(); - } - } - else { - writeLock().lock(); - - try { - // No events could be added to this batch. - finished = true; - - return false; - } - finally { - writeLock().unlock(); - } - } - } - - /** - * @return Queue node. - */ - public ConcurrentLinkedDeque8.Node<Batch> node() { - return qNode; - } - - /** - * @param qNode Queue node. - */ - public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) { - this.qNode = qNode; - } - - /** - * Waits for latch count down after last event was added. - * - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If wait was interrupted. - */ - public void finish() throws IgniteInterruptedCheckedException { - writeLock().lock(); - - try { - // Safety. - assert cap.get() == 0 || finished; - } - finally { - writeLock().unlock(); - } - } - - /** - * @return {@code True} if batch is finished and no more events will be added to it. - */ - public boolean finished() { - readLock().lock(); - - try { - return finished; - } - finally { - readLock().unlock(); - } - } - - /** - * Gets batch size. - * - * @return Batch size. - */ - public int size() { - readLock().lock(); - - try { - return evts == null ? 0 : evts.sizex(); - } - finally { - readLock().unlock(); - } - } - - /** - * @return {@code True} if batch is empty. - */ - public boolean isEmpty() { - readLock().lock(); - - try { - return evts == null || evts.isEmpty(); - } - finally { - readLock().unlock(); - } - } - - /** - * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will - * be added to batch and it can be safely unlinked from the queue. - * - * @return {@code True} if batch is empty and finished. - */ - public boolean emptyFinished() { - writeLock().lock(); - - try { - return finished && (evts == null || evts.isEmpty()); - } - finally { - writeLock().unlock(); - } - } - - /** - * Checks if the batch has expired. - * - * @return {@code True} if the batch has expired, {@code false} otherwise. - */ - public boolean checkExpired() { - if (U.currentTimeMillis() > batchEndTs) { - writeLock().lock(); - - try { - finished = true; - - return true; - } - finally { - writeLock().unlock(); - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() { - readLock().lock(); - - try { - if (evts != null) - return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator(); - - return new ConcurrentLinkedDeque8.IteratorEx<E>() { - @Override public boolean removex() { - throw new NoSuchElementException(); - } - - @Override public boolean hasNext() { - return false; - } - - @Override public E next() { - throw new NoSuchElementException(); - } - - @Override public void remove() { - throw new NoSuchElementException(); - } - }; - } - finally { - readLock().unlock(); - } - } - - /** - * Polls up to {@code cnt} objects from batch in concurrent fashion. - * - * @param cnt Number of objects to poll. - * @return Collection of polled elements (empty collection in case no events were - * present). - */ - public Collection<E> pollNonBatch(int cnt) { - readLock().lock(); - - try { - if (evts == null) - return Collections.emptyList(); - - Collection<E> res = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - E evt = evts.poll(); - - if (evt != null) - res.add(evt); - else - return res; - } - - return res; - } - finally { - readLock().unlock(); - } - } - - /** - * Shrinks this batch. No events can be polled from it after this method. - * - * @return Collection of events contained in batch before shrink (empty collection in - * case no events were present). - */ - public Collection<E> shrink() { - writeLock().lock(); - - try { - if (evts == null) - return Collections.emptyList(); - - // Since iterator can concurrently delete elements, we must poll here. - Collection<E> res = new ArrayList<>(evts.sizex()); - - E o; - - while ((o = evts.poll()) != null) - res.add(o); - - // Nothing cal be polled after shrink. - evts = null; - - return res; - } - finally { - writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - ConcurrentLinkedDeque8<E> evts0 = evts; - - return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java deleted file mode 100644 index 46c582c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer.window; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Window which is bounded by size and time interval. - */ -public class StreamerBoundedTimeWindow<E> extends StreamerWindowAdapter<E> { - /** Window structures holder. */ - private AtomicReference<WindowHolder> ref = new AtomicReference<>(); - - /** Time interval. */ - private long timeInterval; - - /** Window maximum size. */ - private int maxSize; - - /** Unique flag. */ - private boolean unique; - - /** Event order counter. */ - private AtomicLong orderCnt = new AtomicLong(); - - /** - * Gets window maximum size. - * - * @return Maximum size. - */ - public int getMaximumSize() { - return maxSize; - } - - /** - * Sets window maximum size. - * - * @param maxSize Max size. - */ - public void setMaximumSize(int maxSize) { - this.maxSize = maxSize; - } - - /** - * Gets window time interval. - * - * @return Time interval. - */ - public long getTimeInterval() { - return timeInterval; - } - - /** - * Sets window time interval. - * - * @param timeInterval Time interval. - */ - public void setTimeInterval(long timeInterval) { - this.timeInterval = timeInterval; - } - - /** - * Gets window unique flag. - * - * @return {@code True} if only unique events should be added to window. - */ - public boolean isUnique() { - return unique; - } - - /** - * Sets window unique flag. - * - * @param unique {@code True} if only unique events should be added to window. - */ - public void setUnique(boolean unique) { - this.unique = unique; - } - - /** {@inheritDoc} */ - @Override public void checkConfiguration() { - if (timeInterval <= 0) - throw new IgniteException("Failed to initialize window (timeInterval must be positive): [windowClass=" + - getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" + - unique + ']'); - - if (maxSize < 0) - throw new IgniteException("Failed to initialize window (maximumSize cannot be negative): [windowClass=" + - getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" + - unique + ']'); - } - - /** {@inheritDoc} */ - @Override protected void stop0() { - // No-op. - } - - /** {@inheritDoc} */ - @SuppressWarnings("RedundantCast") - @Override protected void reset0() { - ref.set(new WindowHolder(newQueue(), unique ? (Set<Object>)new GridConcurrentHashSet<>() : null, - new AtomicInteger())); - } - - /** {@inheritDoc} */ - @Override public int size() { - return ref.get().size().get(); - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - // Get estimate for eviction queue size. - WindowHolder tup = ref.get(); - - GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection(); - - boolean sizeCheck = maxSize != 0; - - int overflow = tup.size().get() - maxSize; - - long timeBound = U.currentTimeMillis() - timeInterval; - - int idx = 0; - int cnt = 0; - - for (Holder holder : evtsQueue) { - if ((idx < overflow && sizeCheck) || holder.ts < timeBound) - cnt++; - else if ((idx >= overflow && sizeCheck) && holder.ts >= timeBound) - break; - else if (!sizeCheck && holder.ts >= timeBound) - break; - - idx++; - } - - return cnt; - } - - /** {@inheritDoc} */ - @Override protected boolean enqueue0(E evt) { - add(evt, U.currentTimeMillis()); - - return true; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvicted0(int cnt) { - Collection<E> res = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - E evicted = pollEvictedInternal(); - - if (evicted == null) - return res; - - res.add(evicted); - } - - return res; - } - - /** {@inheritDoc} */ - @Override protected Collection<E> pollEvictedBatch0() { - E res = pollEvictedInternal(); - - if (res == null) - return Collections.emptyList(); - - return Collections.singleton(res); - } - - /** {@inheritDoc} */ - @Nullable private <T> T pollEvictedInternal() { - WindowHolder tup = ref.get(); - - AtomicInteger size = tup.size(); - - GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection(); - - long now = U.currentTimeMillis(); - - while (true) { - int curSize = size.get(); - - if (maxSize > 0 && curSize > maxSize) { - if (size.compareAndSet(curSize, curSize - 1)) { - Holder hldr = evtsQueue.pollFirst(); - - if (hldr != null) { - if (unique) - tup.set().remove(hldr.val); - - return (T)hldr.val; - } - else { - // No actual events in queue, it means that other thread is just adding event. - // return null as it is a concurrent add call. - size.incrementAndGet(); - - return null; - } - } - } - else { - // Check if first entry qualifies for eviction. - Holder first = evtsQueue.firstx(); - - if (first != null && first.ts < now - timeInterval) { - if (evtsQueue.remove(first)) { - if (unique) - tup.set().remove(first.val); - - size.decrementAndGet(); - - return (T)first.val; - } - } - else - return null; - } - } - } - - /** {@inheritDoc} */ - @Override protected Collection<E> dequeue0(int cnt) { - WindowHolder tup = ref.get(); - - AtomicInteger size = tup.size(); - GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection(); - - Collection<E> resCol = new ArrayList<>(cnt); - - while (true) { - int curSize = size.get(); - - if (curSize > 0) { - if (size.compareAndSet(curSize, curSize - 1)) { - Holder<E> h = evtsQueue.pollLast(); - - if (h != null) { - resCol.add(h.val); - - if (unique) - tup.set().remove(h.val); - - if (resCol.size() >= cnt) - return resCol; - } - else { - size.incrementAndGet(); - - return resCol; - } - } - } - else - return resCol; - } - } - - /** {@inheritDoc} */ - @Override protected GridStreamerWindowIterator<E> iterator0() { - final WindowHolder win = ref.get(); - - final GridConcurrentSkipListSet<Holder<E>> col = win.collection(); - final Set<Object> set = win.set(); - - final Iterator<Holder<E>> it = col.iterator(); - - return new GridStreamerWindowIterator<E>() { - private Holder<E> lastRet; - - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public E next() { - lastRet = it.next(); - - return lastRet.val; - } - - @Override public E removex() { - if (lastRet == null) - throw new IllegalStateException(); - - if (col.remove(lastRet)) { - if (set != null) - set.remove(lastRet.val); - - win.size().decrementAndGet(); - - return lastRet.val; - } - else - return null; - } - }; - } - - /** - * Checks queue consistency. Used in tests. - */ - void consistencyCheck() { - WindowHolder win = ref.get(); - - assert win.collection().size() == win.size().get(); - - if (win.set() != null) { - // Check no duplicates in collection. - - Collection<Object> vals = new HashSet<>(); - - for (Object evt : win.collection()) - assert vals.add(((Holder)evt).val); - } - } - - /** - * @return New queue. - */ - private GridConcurrentSkipListSet<Holder<E>> newQueue() { - return new GridConcurrentSkipListSet<>(new Comparator<Holder>() { - @Override public int compare(Holder h1, Holder h2) { - if (h1 == h2) - return 0; - - if (h1.ts != h2.ts) - return h1.ts < h2.ts ? -1 : 1; - - return h1.order < h2.order ? -1 : 1; - } - }); - } - - /** - * @param evt Event to add. - * @param ts Event timestamp. - */ - private void add(E evt, long ts) { - WindowHolder tup = ref.get(); - - if (!unique) { - tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet())); - - tup.size().incrementAndGet(); - } - else { - if (tup.set().add(evt)) { - tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet())); - - tup.size().incrementAndGet(); - } - } - } - - /** - * @param evts Events to add. - * @param ts Timestamp for added events. - */ - private void addAll(Iterable<E> evts, long ts) { - for (E evt : evts) - add(evt, ts); - } - - /** - * Holder. - */ - private static class Holder<E> { - /** Value. */ - private E val; - - /** Event timestamp. */ - private long ts; - - /** Event order. */ - private long order; - - /** - * @param val Event. - * @param ts Timestamp. - * @param order Order. - */ - private Holder(E val, long ts, long order) { - this.val = val; - this.ts = ts; - this.order = order; - } - } - - /** - * Window holder. - */ - @SuppressWarnings("ConstantConditions") - private class WindowHolder extends GridTuple3<GridConcurrentSkipListSet<Holder<E>>, Set<Object>, AtomicInteger> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public WindowHolder() { - // No-op. - } - - /** - * @param col Collection. - * @param set Set if unique. - * @param size Size. - */ - private WindowHolder(@Nullable GridConcurrentSkipListSet<Holder<E>> col, - @Nullable Set<Object> set, @Nullable AtomicInteger size) { - super(col, set, size); - } - - /** - * @return Holders collection. - */ - public GridConcurrentSkipListSet<Holder<E>> collection() { - return get1(); - } - - /** - * @return Uniqueness set. - */ - public Set<Object> set() { - return get2(); - } - - /** - * @return Size counter. - */ - public AtomicInteger size() { - return get3(); - } - } -}