This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new fab7ae7 CAMEL-17810: add initial support for the resume API with the master component fab7ae7 is described below commit fab7ae7c86ae514da4bd213c80296c9e63c4f6b6 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Mar 18 15:18:14 2022 +0100 CAMEL-17810: add initial support for the resume API with the master component --- .../camel/component/master/MasterConsumer.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index 8489ebe..a79ce85 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -21,6 +21,8 @@ import java.util.Optional; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Processor; +import org.apache.camel.ResumeAware; +import org.apache.camel.ResumeStrategy; import org.apache.camel.StartupListener; import org.apache.camel.SuspendableService; import org.apache.camel.api.management.ManagedAttribute; @@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory; * A consumer which is only really active when the {@link CamelClusterView} has the leadership. */ @ManagedResource(description = "Managed Master Consumer") -public class MasterConsumer extends DefaultConsumer { +public class MasterConsumer extends DefaultConsumer implements ResumeAware { private static final transient Logger LOG = LoggerFactory.getLogger(MasterConsumer.class); private final CamelClusterService clusterService; @@ -48,6 +50,7 @@ public class MasterConsumer extends DefaultConsumer { private final CamelClusterEventListener.Leadership leadershipListener; private volatile Consumer delegatedConsumer; private volatile CamelClusterView view; + private ResumeStrategy resumeStrategy; public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) { super(masterEndpoint, processor); @@ -60,6 +63,16 @@ public class MasterConsumer extends DefaultConsumer { } @Override + public ResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + @Override + public void setResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + @Override protected void doStart() throws Exception { super.doStart(); @@ -125,6 +138,11 @@ public class MasterConsumer extends DefaultConsumer { getEndpoint().getCamelContext().addStartupListener((StartupListener) delegatedConsumer); } + if (delegatedConsumer instanceof ResumeAware) { + LOG.info("Setting up the resume strategy for the delegated consumer"); + ((ResumeAware) delegatedConsumer).setResumeStrategy(resumeStrategy); + } + ServiceHelper.startService(delegatedEndpoint, delegatedConsumer); LOG.info("Leadership taken. Consumer started: {}", delegatedEndpoint);