This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 59a91a4 CAMEL-17121: converted camel-smpp producer to the repeatable tasks 59a91a4 is described below commit 59a91a4d4924eebcdd460cbde76b925b9a98fb79 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Nov 10 11:52:20 2021 +0100 CAMEL-17121: converted camel-smpp producer to the repeatable tasks --- .../apache/camel/component/smpp/SmppConsumer.java | 27 +++----- .../apache/camel/component/smpp/SmppProducer.java | 71 +++++++++------------- .../org/apache/camel/component/smpp/SmppUtils.java | 35 +++++++++++ 3 files changed, 73 insertions(+), 60 deletions(-) diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java index 4a740aa..2368bba 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java @@ -17,15 +17,11 @@ package org.apache.camel.component.smpp; import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.task.BlockingTask; -import org.apache.camel.support.task.Tasks; -import org.apache.camel.support.task.budget.Budgets; import org.jsmpp.DefaultPDUReader; import org.jsmpp.DefaultPDUSender; import org.jsmpp.SynchronizedPDUSender; @@ -42,6 +38,10 @@ import org.jsmpp.util.DefaultComposer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping; +import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed; +import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; + /** * An implementation of consumer which use the SMPP protocol */ @@ -143,11 +143,11 @@ public class SmppConsumer extends DefaultConsumer { } private boolean doReconnect() { - if (isStopping() || isStopped()) { + if (isServiceStopping(this)) { return true; } - if (session == null || session.getSessionState().equals(SessionState.CLOSED)) { + if (isSessionClosed(session)) { try { LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); session = createSession(); @@ -165,19 +165,8 @@ public class SmppConsumer extends DefaultConsumer { private void reconnect(final long initialReconnectDelay) { if (reconnectLock.tryLock()) { - final String taskName = "smpp-reconnect"; - ScheduledExecutorService service = getEndpoint().getCamelContext().getExecutorServiceManager() - .newSingleThreadScheduledExecutor(this, taskName); - - BlockingTask task = Tasks.backgroundTask() - .withBudget(Budgets.iterationTimeBudget() - .withInitialDelay(Duration.ofMillis(initialReconnectDelay)) - .withMaxIterations(configuration.getMaxReconnect()) - .withUnlimitedDuration() - .build()) - .withScheduledExecutor(service) - .withName(taskName) - .build(); + BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay, + configuration.getMaxReconnect()); try { task.run(this::doReconnect); diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java index 1965eb1..a7bc510 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.task.BlockingTask; import org.jsmpp.DefaultPDUReader; import org.jsmpp.DefaultPDUSender; import org.jsmpp.SynchronizedPDUSender; @@ -37,6 +38,10 @@ import org.jsmpp.util.DefaultComposer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping; +import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed; +import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask; + /** * An implementation of @{link Producer} which use the SMPP protocol */ @@ -178,54 +183,38 @@ public class SmppProducer extends DefaultProducer { private void reconnect(final long initialReconnectDelay) { if (connectLock.tryLock()) { - try { - Runnable r = new Runnable() { - public void run() { - boolean reconnected = false; - - LOG.info("Schedule reconnect after {} millis", initialReconnectDelay); - try { - Thread.sleep(initialReconnectDelay); - } catch (InterruptedException e) { - } - - int attempt = 0; - while (!(isStopping() || isStopped()) - && (session == null || session.getSessionState().equals(SessionState.CLOSED)) - && attempt < configuration.getMaxReconnect()) { - try { - attempt++; - LOG.info("Trying to reconnect to {} - attempt #{}", getEndpoint().getConnectionString(), - attempt); - session = createSession(); - reconnected = true; - } catch (IOException e) { - LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); - closeSession(); - try { - Thread.sleep(configuration.getReconnectDelay()); - } catch (InterruptedException ee) { - } - } - } - - if (reconnected) { - LOG.info("Reconnected to {}", getEndpoint().getConnectionString()); - } - } - }; + BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay, + configuration.getMaxReconnect()); - Thread t = new Thread(r); - t.start(); - t.join(); - } catch (InterruptedException e) { - // noop + try { + task.run(this::doReconnect); } finally { connectLock.unlock(); } } } + private boolean doReconnect() { + if (isServiceStopping(this)) { + return true; + } + + if (isSessionClosed(session)) { + try { + LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString()); + session = createSession(); + return true; + } catch (IOException e) { + LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString()); + closeSession(); + + return false; + } + } + + return true; + } + @Override public SmppEndpoint getEndpoint() { return (SmppEndpoint) super.getEndpoint(); diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java index 2f887a9..2d2eaf0 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java @@ -16,13 +16,22 @@ */ package org.apache.camel.component.smpp; +import java.time.Duration; import java.util.Calendar; import java.util.Date; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.camel.Endpoint; +import org.apache.camel.support.service.BaseService; +import org.apache.camel.support.task.BlockingTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; import org.jsmpp.bean.Alphabet; import org.jsmpp.bean.DataSm; import org.jsmpp.bean.SubmitMulti; import org.jsmpp.bean.SubmitSm; +import org.jsmpp.extra.SessionState; +import org.jsmpp.session.SMPPSession; import org.jsmpp.util.AbsoluteTimeFormatter; import org.jsmpp.util.TimeFormatter; @@ -263,4 +272,30 @@ public final class SmppUtils { } return dest; } + + public static boolean isServiceStopping(BaseService service) { + return service.isStopping() || service.isStopped(); + } + + public static boolean isSessionClosed(SMPPSession session) { + return session == null || session.getSessionState().equals(SessionState.CLOSED); + } + + public static BlockingTask newReconnectTask( + BaseService source, Endpoint endpoint, long initialReconnectDelay, + int maxReconnect) { + final String taskName = "smpp-reconnect"; + ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager() + .newSingleThreadScheduledExecutor(source, taskName); + + return Tasks.backgroundTask() + .withBudget(Budgets.iterationTimeBudget() + .withInitialDelay(Duration.ofMillis(initialReconnectDelay)) + .withMaxIterations(maxReconnect) + .withUnlimitedDuration() + .build()) + .withScheduledExecutor(service) + .withName(taskName) + .build(); + } }