This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e6b2b75257c8ebd006b838f2b452a70c7e5ffe08
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Fri Oct 12 00:34:51 2018 +0200

    [CAMEL-12818] Remove deprecated stuff
---
 .../org/apache/camel/processor/BatchProcessor.java | 498 ---------------------
 .../org/apache/camel/processor/Resequencer.java    | 458 ++++++++++++++++++-
 2 files changed, 453 insertions(+), 503 deletions(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
deleted file mode 100644
index 1e0a2e5..0000000
--- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
-import org.apache.camel.Navigate;
-import org.apache.camel.Predicate;
-import org.apache.camel.Processor;
-import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.LoggingExceptionHandler;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A base class for any kind of {@link Processor} which implements some kind 
of batch processing.
- * 
- * @version 
- * @deprecated may be removed in the future when we overhaul the resequencer 
EIP
- */
-@Deprecated
-public class BatchProcessor extends ServiceSupport implements AsyncProcessor, 
Navigate<Processor>, IdAware {
-
-    public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
-    public static final int DEFAULT_BATCH_SIZE = 100;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(BatchProcessor.class);
-
-    private String id;
-    private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
-    private int batchSize = DEFAULT_BATCH_SIZE;
-    private int outBatchSize;
-    private boolean groupExchanges;
-    private boolean batchConsumer;
-    private boolean ignoreInvalidExchanges;
-    private boolean reverse;
-    private boolean allowDuplicates;
-    private Predicate completionPredicate;
-    private Expression expression;
-
-    private final CamelContext camelContext;
-    private final Processor processor;
-    private final Collection<Exchange> collection;
-    private ExceptionHandler exceptionHandler;
-
-    private final BatchSender sender;
-
-    public BatchProcessor(CamelContext camelContext, Processor processor, 
Collection<Exchange> collection, Expression expression) {
-        ObjectHelper.notNull(camelContext, "camelContext");
-        ObjectHelper.notNull(processor, "processor");
-        ObjectHelper.notNull(collection, "collection");
-        ObjectHelper.notNull(expression, "expression");
-
-        // wrap processor in UnitOfWork so what we send out of the batch runs 
in a UoW
-        this.camelContext = camelContext;
-        this.processor = processor;
-        this.collection = collection;
-        this.expression = expression;
-        this.sender = new BatchSender();
-        this.exceptionHandler = new LoggingExceptionHandler(camelContext, 
getClass());
-    }
-
-    @Override
-    public String toString() {
-        return "BatchProcessor[to: " + processor + "]";
-    }
-
-    // Properties
-    // 
-------------------------------------------------------------------------
-
-
-    public Expression getExpression() {
-        return expression;
-    }
-
-    public ExceptionHandler getExceptionHandler() {
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Sets the <b>in</b> batch size. This is the number of incoming exchanges 
that this batch processor will
-     * process before its completed. The default value is {@link 
#DEFAULT_BATCH_SIZE}.
-     * 
-     * @param batchSize the size
-     */
-    public void setBatchSize(int batchSize) {
-        // setting batch size to 0 or negative is like disabling it, so we set 
it as the max value
-        // as the code logic is dependent on a batch size having 1..n value
-        if (batchSize <= 0) {
-            LOG.debug("Disabling batch size, will only be triggered by 
timeout");
-            this.batchSize = Integer.MAX_VALUE;
-        } else {
-            this.batchSize = batchSize;
-        }
-    }
-
-    public int getOutBatchSize() {
-        return outBatchSize;
-    }
-
-    /**
-     * Sets the <b>out</b> batch size. If the batch processor holds more 
exchanges than this out size then the
-     * completion is triggered. Can for instance be used to ensure that this 
batch is completed when a certain
-     * number of exchanges has been collected. By default this feature is 
<b>not</b> enabled.
-     * 
-     * @param outBatchSize the size
-     */
-    public void setOutBatchSize(int outBatchSize) {
-        this.outBatchSize = outBatchSize;
-    }
-
-    public long getBatchTimeout() {
-        return batchTimeout;
-    }
-
-    public void setBatchTimeout(long batchTimeout) {
-        this.batchTimeout = batchTimeout;
-    }
-
-    public boolean isGroupExchanges() {
-        return groupExchanges;
-    }
-
-    public void setGroupExchanges(boolean groupExchanges) {
-        this.groupExchanges = groupExchanges;
-    }
-
-    public boolean isBatchConsumer() {
-        return batchConsumer;
-    }
-
-    public void setBatchConsumer(boolean batchConsumer) {
-        this.batchConsumer = batchConsumer;
-    }
-
-    public boolean isIgnoreInvalidExchanges() {
-        return ignoreInvalidExchanges;
-    }
-
-    public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
-        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
-    }
-
-    public boolean isReverse() {
-        return reverse;
-    }
-
-    public void setReverse(boolean reverse) {
-        this.reverse = reverse;
-    }
-
-    public boolean isAllowDuplicates() {
-        return allowDuplicates;
-    }
-
-    public void setAllowDuplicates(boolean allowDuplicates) {
-        this.allowDuplicates = allowDuplicates;
-    }
-
-    public Predicate getCompletionPredicate() {
-        return completionPredicate;
-    }
-
-    public void setCompletionPredicate(Predicate completionPredicate) {
-        this.completionPredicate = completionPredicate;
-    }
-
-    public Processor getProcessor() {
-        return processor;
-    }
-
-    public List<Processor> next() {
-        if (!hasNext()) {
-            return null;
-        }
-        List<Processor> answer = new ArrayList<>(1);
-        answer.add(processor);
-        return answer;
-    }
-
-    public boolean hasNext() {
-        return processor != null;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    /**
-     * A strategy method to decide if the "in" batch is completed. That is, 
whether the resulting exchanges in
-     * the in queue should be drained to the "out" collection.
-     */
-    private boolean isInBatchCompleted(int num) {
-        return num >= batchSize;
-    }
-
-    /**
-     * A strategy method to decide if the "out" batch is completed. That is, 
whether the resulting exchange in
-     * the out collection should be sent.
-     */
-    private boolean isOutBatchCompleted() {
-        if (outBatchSize == 0) {
-            // out batch is disabled, so go ahead and send.
-            return true;
-        }
-        return collection.size() > 0 && collection.size() >= outBatchSize;
-    }
-
-    /**
-     * Strategy Method to process an exchange in the batch. This method allows 
derived classes to perform
-     * custom processing before or after an individual exchange is processed
-     */
-    protected void processExchange(Exchange exchange) throws Exception {
-        processor.process(exchange);
-        if (exchange.getException() != null) {
-            getExceptionHandler().handleException("Error processing aggregated 
exchange: " + exchange, exchange.getException());
-        }
-    }
-
-    protected void doStart() throws Exception {
-        ServiceHelper.startServices(processor);
-        sender.start();
-    }
-
-    protected void doStop() throws Exception {
-        sender.cancel();
-        ServiceHelper.stopServices(processor);
-        collection.clear();
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    /**
-     * Enqueues an exchange for later batch processing.
-     */
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        try {
-            // if batch consumer is enabled then we need to adjust the batch 
size
-            // with the size from the batch consumer
-            if (isBatchConsumer()) {
-                int size = exchange.getProperty(Exchange.BATCH_SIZE, 
Integer.class);
-                if (batchSize != size) {
-                    batchSize = size;
-                    LOG.trace("Using batch consumer completion, so setting 
batch size to: {}", batchSize);
-                }
-            }
-
-            // validate that the exchange can be used
-            if (!isValid(exchange)) {
-                if (isIgnoreInvalidExchanges()) {
-                    LOG.debug("Invalid Exchange. This Exchange will be 
ignored: {}", exchange);
-                } else {
-                    throw new CamelExchangeException("Exchange is not valid to 
be used by the BatchProcessor", exchange);
-                }
-            } else {
-                // exchange is valid so enqueue the exchange
-                sender.enqueueExchange(exchange);
-            }
-        } catch (Throwable e) {
-            exchange.setException(e);
-        }
-        callback.done(true);
-        return true;
-    }
-
-    /**
-     * Is the given exchange valid to be used.
-     *
-     * @param exchange the given exchange
-     * @return <tt>true</tt> if valid, <tt>false</tt> otherwise
-     */
-    private boolean isValid(Exchange exchange) {
-        Object result = null;
-        try {
-            result = expression.evaluate(exchange, Object.class);
-        } catch (Exception e) {
-            // ignore
-        }
-        return result != null;
-    }
-
-    /**
-     * Sender thread for queued-up exchanges.
-     */
-    private class BatchSender extends Thread {
-
-        private Queue<Exchange> queue;
-        private Lock queueLock = new ReentrantLock();
-        private final AtomicBoolean exchangeEnqueued = new AtomicBoolean();
-        private final Queue<String> completionPredicateMatched = new 
ConcurrentLinkedQueue<>();
-        private Condition exchangeEnqueuedCondition = queueLock.newCondition();
-
-        BatchSender() {
-            
super(camelContext.getExecutorServiceManager().resolveThreadName("Batch 
Sender"));
-            this.queue = new LinkedList<>();
-        }
-
-        @Override
-        public void run() {
-            // Wait until one of either:
-            // * an exchange being queued;
-            // * the batch timeout expiring; or
-            // * the thread being cancelled.
-            //
-            // If an exchange is queued then we need to determine whether the
-            // batch is complete. If it is complete then we send out the 
batched
-            // exchanges. Otherwise we move back into our wait state.
-            //
-            // If the batch times out then we send out the batched exchanges
-            // collected so far.
-            //
-            // If we receive an interrupt then all blocking operations are
-            // interrupted and our thread terminates.
-            //
-            // The goal of the following algorithm in terms of synchronisation
-            // is to provide fine grained locking i.e. retaining the lock only
-            // when required. Special consideration is given to releasing the
-            // lock when calling an overloaded method i.e. sendExchanges. 
-            // Unlocking is important as the process of sending out the 
exchanges
-            // would otherwise block new exchanges from being queued.
-
-            queueLock.lock();
-            try {
-                do {
-                    try {
-                        if (!exchangeEnqueued.get()) {
-                            LOG.trace("Waiting for new exchange to arrive or 
batchTimeout to occur after {} ms.", batchTimeout);
-                            exchangeEnqueuedCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
-                        }
-
-                        // if the completion predicate was triggered then 
there is an exchange id which denotes when to complete
-                        String id = null;
-                        if (!completionPredicateMatched.isEmpty()) {
-                            id = completionPredicateMatched.poll();
-                        }
-
-                        if (id != null || !exchangeEnqueued.get()) {
-                            if (id != null) {
-                                LOG.trace("Collecting exchanges to be 
aggregated triggered by completion predicate");
-                            } else {
-                                LOG.trace("Collecting exchanges to be 
aggregated triggered by batch timeout");
-                            }
-                            drainQueueTo(collection, batchSize, id);
-                        } else {
-                            exchangeEnqueued.set(false);
-                            boolean drained = false;
-                            while (isInBatchCompleted(queue.size())) {
-                                drained = true;
-                                drainQueueTo(collection, batchSize, id);
-                            }
-                            if (drained) {
-                                LOG.trace("Collecting exchanges to be 
aggregated triggered by new exchanges received");
-                            }
-
-                            if (!isOutBatchCompleted()) {
-                                continue;
-                            }
-                        }
-
-                        queueLock.unlock();
-                        try {
-                            try {
-                                sendExchanges();
-                            } catch (Throwable t) {
-                                // a fail safe to handle all exceptions being 
thrown
-                                getExceptionHandler().handleException(t);
-                            }
-                        } finally {
-                            queueLock.lock();
-                        }
-
-                    } catch (InterruptedException e) {
-                        break;
-                    }
-
-                } while (isRunAllowed());
-
-            } finally {
-                queueLock.unlock();
-            }
-        }
-
-        /**
-         * This method should be called with queueLock held
-         */
-        private void drainQueueTo(Collection<Exchange> collection, int 
batchSize, String exchangeId) {
-            for (int i = 0; i < batchSize; ++i) {
-                Exchange e = queue.poll();
-                if (e != null) {
-                    try {
-                        collection.add(e);
-                    } catch (Exception t) {
-                        e.setException(t);
-                    } catch (Throwable t) {
-                        getExceptionHandler().handleException(t);
-                    }
-                    if (exchangeId != null && 
exchangeId.equals(e.getExchangeId())) {
-                        // this batch is complete so stop draining
-                        break;
-                    }
-                } else {
-                    break;
-                }
-            }
-        }
-
-        public void cancel() {
-            interrupt();
-        }
-
-        public void enqueueExchange(Exchange exchange) {
-            LOG.debug("Received exchange to be batched: {}", exchange);
-            queueLock.lock();
-            try {
-                // pre test whether the completion predicate matched
-                if (completionPredicate != null) {
-                    boolean matches = completionPredicate.matches(exchange);
-                    if (matches) {
-                        LOG.trace("Exchange matched completion predicate: {}", 
exchange);
-                        // add this exchange to the list of exchanges which 
marks the batch as complete
-                        
completionPredicateMatched.add(exchange.getExchangeId());
-                    }
-                }
-                queue.add(exchange);
-                exchangeEnqueued.set(true);
-                exchangeEnqueuedCondition.signal();
-            } finally {
-                queueLock.unlock();
-            }
-        }
-        
-        private void sendExchanges() throws Exception {
-            Iterator<Exchange> iter = collection.iterator();
-            while (iter.hasNext()) {
-                Exchange exchange = iter.next();
-                iter.remove();
-                try {
-                    LOG.debug("Sending aggregated exchange: {}", exchange);
-                    processExchange(exchange);
-                } catch (Throwable t) {
-                    // must catch throwable to avoid growing memory
-                    getExceptionHandler().handleException("Error processing 
aggregated exchange: " + exchange, t);
-                }
-            }
-        }
-    }
-
-}
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java 
b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
index a462839..2188777 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -16,16 +16,40 @@
  */
 package org.apache.camel.processor;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.Navigate;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.LoggingExceptionHandler;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExpressionComparator;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * An implementation of the <a 
href="http://camel.apache.org/resequencer.html";>Resequencer</a>
@@ -33,10 +57,29 @@ import org.apache.camel.util.ExpressionComparator;
  *
  * @version 
  */
-@SuppressWarnings("deprecation")
-public class Resequencer extends BatchProcessor implements Traceable {
+public class Resequencer extends ServiceSupport implements AsyncProcessor, 
Navigate<Processor>, IdAware, Traceable {
 
-    // TODO: Rework to avoid using BatchProcessor
+    public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
+    public static final int DEFAULT_BATCH_SIZE = 100;
+
+    private String id;
+    private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
+    private int batchSize = DEFAULT_BATCH_SIZE;
+    private int outBatchSize;
+    private boolean groupExchanges;
+    private boolean batchConsumer;
+    private boolean ignoreInvalidExchanges;
+    private boolean reverse;
+    private boolean allowDuplicates;
+    private Predicate completionPredicate;
+    private Expression expression;
+
+    private final CamelContext camelContext;
+    private final Processor processor;
+    private final Collection<Exchange> collection;
+    private ExceptionHandler exceptionHandler;
+
+    private final BatchSender sender;
 
     public Resequencer(CamelContext camelContext, Processor processor, 
Expression expression) {
         this(camelContext, processor, createSet(expression, false, false), 
expression);
@@ -48,7 +91,18 @@ public class Resequencer extends BatchProcessor implements 
Traceable {
     }
 
     public Resequencer(CamelContext camelContext, Processor processor, 
Set<Exchange> collection, Expression expression) {
-        super(camelContext, processor, collection, expression);
+        ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(processor, "processor");
+        ObjectHelper.notNull(collection, "collection");
+        ObjectHelper.notNull(expression, "expression");
+
+        // wrap processor in UnitOfWork so what we send out of the batch runs 
in a UoW
+        this.camelContext = camelContext;
+        this.processor = processor;
+        this.collection = collection;
+        this.expression = expression;
+        this.sender = new BatchSender();
+        this.exceptionHandler = new LoggingExceptionHandler(camelContext, 
getClass());
     }
 
     @Override
@@ -60,6 +114,139 @@ public class Resequencer extends BatchProcessor implements 
Traceable {
         return "resequencer";
     }
 
+    // Properties
+    // 
-------------------------------------------------------------------------
+
+
+    public Expression getExpression() {
+        return expression;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets the <b>in</b> batch size. This is the number of incoming exchanges 
that this batch processor will
+     * process before its completed. The default value is {@link 
#DEFAULT_BATCH_SIZE}.
+     *
+     * @param batchSize the size
+     */
+    public void setBatchSize(int batchSize) {
+        // setting batch size to 0 or negative is like disabling it, so we set 
it as the max value
+        // as the code logic is dependent on a batch size having 1..n value
+        if (batchSize <= 0) {
+            log.debug("Disabling batch size, will only be triggered by 
timeout");
+            this.batchSize = Integer.MAX_VALUE;
+        } else {
+            this.batchSize = batchSize;
+        }
+    }
+
+    public int getOutBatchSize() {
+        return outBatchSize;
+    }
+
+    /**
+     * Sets the <b>out</b> batch size. If the batch processor holds more 
exchanges than this out size then the
+     * completion is triggered. Can for instance be used to ensure that this 
batch is completed when a certain
+     * number of exchanges has been collected. By default this feature is 
<b>not</b> enabled.
+     *
+     * @param outBatchSize the size
+     */
+    public void setOutBatchSize(int outBatchSize) {
+        this.outBatchSize = outBatchSize;
+    }
+
+    public long getBatchTimeout() {
+        return batchTimeout;
+    }
+
+    public void setBatchTimeout(long batchTimeout) {
+        this.batchTimeout = batchTimeout;
+    }
+
+    public boolean isGroupExchanges() {
+        return groupExchanges;
+    }
+
+    public void setGroupExchanges(boolean groupExchanges) {
+        this.groupExchanges = groupExchanges;
+    }
+
+    public boolean isBatchConsumer() {
+        return batchConsumer;
+    }
+
+    public void setBatchConsumer(boolean batchConsumer) {
+        this.batchConsumer = batchConsumer;
+    }
+
+    public boolean isIgnoreInvalidExchanges() {
+        return ignoreInvalidExchanges;
+    }
+
+    public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
+        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
+    }
+
+    public boolean isReverse() {
+        return reverse;
+    }
+
+    public void setReverse(boolean reverse) {
+        this.reverse = reverse;
+    }
+
+    public boolean isAllowDuplicates() {
+        return allowDuplicates;
+    }
+
+    public void setAllowDuplicates(boolean allowDuplicates) {
+        this.allowDuplicates = allowDuplicates;
+    }
+
+    public Predicate getCompletionPredicate() {
+        return completionPredicate;
+    }
+
+    public void setCompletionPredicate(Predicate completionPredicate) {
+        this.completionPredicate = completionPredicate;
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<>(1);
+        answer.add(processor);
+        return answer;
+    }
+
+    public boolean hasNext() {
+        return processor != null;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -99,4 +286,265 @@ public class Resequencer extends BatchProcessor implements 
Traceable {
         return new TreeSet<>(answer);
     }
 
-}
+    /**
+     * A strategy method to decide if the "in" batch is completed. That is, 
whether the resulting exchanges in
+     * the in queue should be drained to the "out" collection.
+     */
+    private boolean isInBatchCompleted(int num) {
+        return num >= batchSize;
+    }
+
+    /**
+     * A strategy method to decide if the "out" batch is completed. That is, 
whether the resulting exchange in
+     * the out collection should be sent.
+     */
+    private boolean isOutBatchCompleted() {
+        if (outBatchSize == 0) {
+            // out batch is disabled, so go ahead and send.
+            return true;
+        }
+        return collection.size() > 0 && collection.size() >= outBatchSize;
+    }
+
+    /**
+     * Strategy Method to process an exchange in the batch. This method allows 
derived classes to perform
+     * custom processing before or after an individual exchange is processed
+     */
+    protected void processExchange(Exchange exchange) throws Exception {
+        processor.process(exchange);
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing aggregated 
exchange: " + exchange, exchange.getException());
+        }
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(processor);
+        sender.start();
+    }
+
+    protected void doStop() throws Exception {
+        sender.cancel();
+        ServiceHelper.stopService(processor);
+        collection.clear();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    /**
+     * Enqueues an exchange for later batch processing.
+     */
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            // if batch consumer is enabled then we need to adjust the batch 
size
+            // with the size from the batch consumer
+            if (isBatchConsumer()) {
+                int size = exchange.getProperty(Exchange.BATCH_SIZE, 
Integer.class);
+                if (batchSize != size) {
+                    batchSize = size;
+                    log.trace("Using batch consumer completion, so setting 
batch size to: {}", batchSize);
+                }
+            }
+
+            // validate that the exchange can be used
+            if (!isValid(exchange)) {
+                if (isIgnoreInvalidExchanges()) {
+                    log.debug("Invalid Exchange. This Exchange will be 
ignored: {}", exchange);
+                } else {
+                    throw new CamelExchangeException("Exchange is not valid to 
be used by the BatchProcessor", exchange);
+                }
+            } else {
+                // exchange is valid so enqueue the exchange
+                sender.enqueueExchange(exchange);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+        callback.done(true);
+        return true;
+    }
+
+    /**
+     * Is the given exchange valid to be used.
+     *
+     * @param exchange the given exchange
+     * @return <tt>true</tt> if valid, <tt>false</tt> otherwise
+     */
+    private boolean isValid(Exchange exchange) {
+        Object result = null;
+        try {
+            result = expression.evaluate(exchange, Object.class);
+        } catch (Exception e) {
+            // ignore
+        }
+        return result != null;
+    }
+
+    /**
+     * Sender thread for queued-up exchanges.
+     */
+    private class BatchSender extends Thread {
+
+        private Queue<Exchange> queue;
+        private Lock queueLock = new ReentrantLock();
+        private final AtomicBoolean exchangeEnqueued = new AtomicBoolean();
+        private final Queue<String> completionPredicateMatched = new 
ConcurrentLinkedQueue<>();
+        private Condition exchangeEnqueuedCondition = queueLock.newCondition();
+
+        BatchSender() {
+            
super(camelContext.getExecutorServiceManager().resolveThreadName("Batch 
Sender"));
+            this.queue = new LinkedList<>();
+        }
+
+        @Override
+        public void run() {
+            // Wait until one of either:
+            // * an exchange being queued;
+            // * the batch timeout expiring; or
+            // * the thread being cancelled.
+            //
+            // If an exchange is queued then we need to determine whether the
+            // batch is complete. If it is complete then we send out the 
batched
+            // exchanges. Otherwise we move back into our wait state.
+            //
+            // If the batch times out then we send out the batched exchanges
+            // collected so far.
+            //
+            // If we receive an interrupt then all blocking operations are
+            // interrupted and our thread terminates.
+            //
+            // The goal of the following algorithm in terms of synchronisation
+            // is to provide fine grained locking i.e. retaining the lock only
+            // when required. Special consideration is given to releasing the
+            // lock when calling an overloaded method i.e. sendExchanges.
+            // Unlocking is important as the process of sending out the 
exchanges
+            // would otherwise block new exchanges from being queued.
+
+            queueLock.lock();
+            try {
+                do {
+                    try {
+                        if (!exchangeEnqueued.get()) {
+                            log.trace("Waiting for new exchange to arrive or 
batchTimeout to occur after {} ms.", batchTimeout);
+                            exchangeEnqueuedCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
+                        }
+
+                        // if the completion predicate was triggered then 
there is an exchange id which denotes when to complete
+                        String id = null;
+                        if (!completionPredicateMatched.isEmpty()) {
+                            id = completionPredicateMatched.poll();
+                        }
+
+                        if (id != null || !exchangeEnqueued.get()) {
+                            if (id != null) {
+                                log.trace("Collecting exchanges to be 
aggregated triggered by completion predicate");
+                            } else {
+                                log.trace("Collecting exchanges to be 
aggregated triggered by batch timeout");
+                            }
+                            drainQueueTo(collection, batchSize, id);
+                        } else {
+                            exchangeEnqueued.set(false);
+                            boolean drained = false;
+                            while (isInBatchCompleted(queue.size())) {
+                                drained = true;
+                                drainQueueTo(collection, batchSize, id);
+                            }
+                            if (drained) {
+                                log.trace("Collecting exchanges to be 
aggregated triggered by new exchanges received");
+                            }
+
+                            if (!isOutBatchCompleted()) {
+                                continue;
+                            }
+                        }
+
+                        queueLock.unlock();
+                        try {
+                            try {
+                                sendExchanges();
+                            } catch (Throwable t) {
+                                // a fail safe to handle all exceptions being 
thrown
+                                getExceptionHandler().handleException(t);
+                            }
+                        } finally {
+                            queueLock.lock();
+                        }
+
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+
+                } while (isRunAllowed());
+
+            } finally {
+                queueLock.unlock();
+            }
+        }
+
+        /**
+         * This method should be called with queueLock held
+         */
+        private void drainQueueTo(Collection<Exchange> collection, int 
batchSize, String exchangeId) {
+            for (int i = 0; i < batchSize; ++i) {
+                Exchange e = queue.poll();
+                if (e != null) {
+                    try {
+                        collection.add(e);
+                    } catch (Exception t) {
+                        e.setException(t);
+                    } catch (Throwable t) {
+                        getExceptionHandler().handleException(t);
+                    }
+                    if (exchangeId != null && 
exchangeId.equals(e.getExchangeId())) {
+                        // this batch is complete so stop draining
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+
+        public void cancel() {
+            interrupt();
+        }
+
+        public void enqueueExchange(Exchange exchange) {
+            log.debug("Received exchange to be batched: {}", exchange);
+            queueLock.lock();
+            try {
+                // pre test whether the completion predicate matched
+                if (completionPredicate != null) {
+                    boolean matches = completionPredicate.matches(exchange);
+                    if (matches) {
+                        log.trace("Exchange matched completion predicate: {}", 
exchange);
+                        // add this exchange to the list of exchanges which 
marks the batch as complete
+                        
completionPredicateMatched.add(exchange.getExchangeId());
+                    }
+                }
+                queue.add(exchange);
+                exchangeEnqueued.set(true);
+                exchangeEnqueuedCondition.signal();
+            } finally {
+                queueLock.unlock();
+            }
+        }
+
+        private void sendExchanges() throws Exception {
+            Iterator<Exchange> iter = collection.iterator();
+            while (iter.hasNext()) {
+                Exchange exchange = iter.next();
+                iter.remove();
+                try {
+                    log.debug("Sending aggregated exchange: {}", exchange);
+                    processExchange(exchange);
+                } catch (Throwable t) {
+                    // must catch throwable to avoid growing memory
+                    getExceptionHandler().handleException("Error processing 
aggregated exchange: " + exchange, t);
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to