This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit f13c75dd416edfffffee0032bc03cd9debafd002 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Jun 10 14:49:11 2022 +0200 CAMEL-18127: make part of the resume logic reusable --- .../component/cassandra/CassandraConsumer.java | 21 +---- .../component/couchbase/CouchbaseConsumer.java | 24 +----- .../camel/component/couchdb/CouchDbConsumer.java | 21 +---- .../camel/support/resume/ResumeStrategyHelper.java | 94 ++++++++++++++++++++++ 4 files changed, 100 insertions(+), 60 deletions(-) diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index 0f87544b632..e8216140ce4 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -22,13 +22,10 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.resume.ResumeAction; -import org.apache.camel.resume.ResumeActionAware; -import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.ScheduledPollConsumer; -import org.apache.camel.util.ObjectHelper; +import org.apache.camel.support.resume.ResumeStrategyHelper; import static org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION; @@ -88,21 +85,7 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw preparedStatement = getEndpoint().prepareStatement(); } - if (resumeStrategy != null) { - resumeStrategy.loadCache(); - - ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class); - if (resumeAdapter != null) { - if (resumeAdapter instanceof ResumeActionAware) { - ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry() - .lookupByName(CASSANDRA_RESUME_ACTION); - ObjectHelper.notNull(action, "The resume action cannot be null", this); - - ((ResumeActionAware) resumeAdapter).setResumeAction(action); - } - resumeAdapter.resume(); - } - } + ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, CASSANDRA_RESUME_ACTION); super.doStart(); } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index 1bf14d23ae5..f82621ca63a 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -26,13 +26,10 @@ import com.couchbase.client.java.view.ViewResult; import com.couchbase.client.java.view.ViewRow; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.resume.ResumeAction; -import org.apache.camel.resume.ResumeActionAware; -import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.DefaultScheduledPollConsumer; -import org.apache.camel.util.ObjectHelper; +import org.apache.camel.support.resume.ResumeStrategyHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,24 +97,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R protected void doStart() throws Exception { super.doStart(); - if (resumeStrategy != null) { - LOG.debug("Loading the resume cache"); - resumeStrategy.loadCache(); - - LOG.info("Couchbase consumer running with resume strategy enabled"); - - ResumeAdapter resumeAdapter = resumeStrategy.getAdapter(ResumeAdapter.class); - if (resumeAdapter != null) { - if (resumeAdapter instanceof ResumeActionAware) { - ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry() - .lookupByName(COUCHBASE_RESUME_ACTION); - ObjectHelper.notNull(action, "The resume action cannot be null", this); - - ((ResumeActionAware) resumeAdapter).setResumeAction(action); - } - resumeAdapter.resume(); - } - } + ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, COUCHBASE_RESUME_ACTION); } @Override diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java index ff34cc704fd..5ce86edd3dc 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java @@ -21,13 +21,10 @@ import java.util.concurrent.ExecutorService; import com.google.gson.JsonObject; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.resume.ResumeAction; -import org.apache.camel.resume.ResumeActionAware; -import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.DefaultConsumer; -import org.apache.camel.util.ObjectHelper; +import org.apache.camel.support.resume.ResumeStrategyHelper; import static org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION; @@ -68,21 +65,7 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu @Override protected void doStart() throws Exception { - if (resumeStrategy != null) { - resumeStrategy.loadCache(); - - ResumeAdapter adapter = resumeStrategy.getAdapter(ResumeAdapter.class); - if (adapter != null) { - if (adapter instanceof ResumeActionAware) { - ResumeAction action = (ResumeAction) getEndpoint().getCamelContext().getRegistry() - .lookupByName(COUCHDB_RESUME_ACTION); - ObjectHelper.notNull(action, "The resume action cannot be null", this); - - ((ResumeActionAware) adapter).setResumeAction(action); - } - adapter.resume(); - } - } + ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, COUCHDB_RESUME_ACTION); super.doStart(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java new file mode 100644 index 00000000000..33b3fba7264 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeStrategyHelper.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.support.resume; + +import org.apache.camel.CamelContext; +import org.apache.camel.resume.ResumeAction; +import org.apache.camel.resume.ResumeActionAware; +import org.apache.camel.resume.ResumeAdapter; +import org.apache.camel.resume.ResumeStrategy; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for the resume strategy + */ +public final class ResumeStrategyHelper { + private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyHelper.class); + + private ResumeStrategyHelper() { + + } + + /** + * Executes the resume operation + * + * @param context a camel context on which the registry will be searched for the resume action + * @param on the calling instance for the resume operation + * @param resumeStrategy the instance of the {@link ResumeStrategy} to perform the resume + * @param actionName an action name that maps to a {@link ResumeAction} object in the registry + * @throws Exception if the strategy is unable to load the cache + */ + public static void resume( + CamelContext context, Object on, ResumeStrategy resumeStrategy, + String actionName) + throws Exception { + resume(context, on, resumeStrategy, actionName, ResumeAdapter.class); + } + + /** + * Executes the resume operation + * + * @param context a camel context on which the registry will be searched for the resume action + * @param on the calling instance for the resume operation + * @param resumeStrategy the instance of the {@link ResumeStrategy} to perform the resume + * @param actionName an action name that maps to a {@link ResumeAction} object in the registry + * @param adapterClass the class of the {@link ResumeAdapter} to look for in the registry + * @throws Exception if the strategy is unable to load the cache + */ + public static <T extends ResumeAdapter> void resume( + CamelContext context, Object on, ResumeStrategy resumeStrategy, + String actionName, Class<T> adapterClass) + throws Exception { + if (resumeStrategy == null) { + LOG.debug("Skipping resume operation because there's no resume strategy defined"); + + return; + } + + LOG.debug("Loading the resume cache"); + resumeStrategy.loadCache(); + + T resumeAdapter = resumeStrategy.getAdapter(adapterClass); + if (resumeAdapter == null) { + LOG.warn("The resume cannot be executed because no resume adapter was provided"); + + return; + } + + if (resumeAdapter instanceof ResumeActionAware) { + ResumeAction action = (ResumeAction) context.getRegistry().lookupByName(actionName); + ObjectHelper.notNull(action, "The resume action cannot be null", on); + + ((ResumeActionAware) resumeAdapter).setResumeAction(action); + } + + resumeAdapter.resume(); + } +}