This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push: new 23787071 CAMEL-18677: allow setting the cache on the configuration builder 23787071 is described below commit 237870719af4b790f5875156e94b16d76e0e2df9 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Nov 3 18:22:07 2022 +0100 CAMEL-18677: allow setting the cache on the configuration builder --- .../example/resume/aws/kinesis/main/KinesisRoute.java | 3 +-- .../example/resume/cassandra/main/CassandraRoute.java | 15 +++++---------- .../camel/example/resume/cassandra/main/MainApp.java | 6 ++++-- .../strategies/kafka/file/LargeFileRouteBuilder.java | 6 +++--- .../kafka/fileset/LargeDirectoryRouteBuilder.java | 4 +--- .../strategies/ClusterizedLargeDirectoryRouteBuilder.java | 6 +++--- 6 files changed, 17 insertions(+), 23 deletions(-) diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java index cd097042..a7e05c26 100644 --- a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java +++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java @@ -55,13 +55,12 @@ public class KinesisRoute extends RouteBuilder { @Override public void configure() { - bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache); bindToRegistry("amazonKinesisClient", client); String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient"; fromF(kinesisEndpointUri, streamName) - .resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder()) + .resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder().withResumeCache(resumeCache)) .process(this::addResumeOffset); } } diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java index 3f90c7b4..1bf1c25e 100644 --- a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java +++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java @@ -23,9 +23,8 @@ import java.util.concurrent.CountDownLatch; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cassandra.CassandraConstants; +import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; import org.apache.camel.resume.ResumeAction; -import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.resume.cache.ResumeCache; import org.apache.camel.support.resume.Resumables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +33,13 @@ public class CassandraRoute extends RouteBuilder { private static final Logger LOG = LoggerFactory.getLogger(CassandraRoute.class); private final CountDownLatch latch; private final int batchSize; - private final ResumeStrategy resumeStrategy; - private final ResumeCache<?> resumeCache; private final CassandraClient client; + private final KafkaResumeStrategyConfigurationBuilder configurationBuilder; - public CassandraRoute(CountDownLatch latch, int batchSize, ResumeStrategy resumeStrategy, ResumeCache<?> resumeCache, CassandraClient client) { + public CassandraRoute(CountDownLatch latch, int batchSize, KafkaResumeStrategyConfigurationBuilder configurationBuilder, CassandraClient client) { this.latch = latch; this.batchSize = batchSize; - this.resumeStrategy = resumeStrategy; - this.resumeCache = resumeCache; + this.configurationBuilder = configurationBuilder; this.client = client; } @@ -78,15 +75,13 @@ public class CassandraRoute extends RouteBuilder { @Override public void configure() { - bindToRegistry(ResumeStrategy.DEFAULT_NAME, resumeStrategy); - bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache); bindToRegistry(CassandraConstants.CASSANDRA_RESUME_ACTION, new CustomResumeAction(client.newExampleDao())); fromF("cql:{{cassandra.host}}:{{cassandra.cql3.port}}/camel_ks?cql=%s&resultSetConversionStrategy=#class:%s", ExampleDao.getSelectStatement(batchSize), ExampleResultSetConversionStrategy.class.getName()) .split(body()) // We receive a list of records so, for each .resumable() - .resumeStrategy(ResumeStrategy.DEFAULT_NAME) + .configuration(configurationBuilder) .intermittent(true) // Set to ignore empty data sets that will generate exchanges w/ no offset information .process(this::addResumeInfo); diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java index 6606b3c6..6718d38f 100644 --- a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java +++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import org.apache.camel.component.caffeine.resume.CaffeineCache; import org.apache.camel.example.resume.strategies.kafka.KafkaUtil; import org.apache.camel.main.Main; +import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy; public class MainApp { @@ -47,7 +48,7 @@ public class MainApp { } // Normal code path for consuming from Cassandra - SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy(); + final KafkaResumeStrategyConfigurationBuilder configurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder(); Main main = new Main(); @@ -57,7 +58,8 @@ public class MainApp { try (CassandraClient client = new CassandraClient(host, port)) { - main.configure().addRoutesBuilder(new CassandraRoute(latch, batchSize, resumeStrategy, new CaffeineCache<>(10240), client)); + configurationBuilder.withResumeCache(new CaffeineCache<>(10240)); + main.configure().addRoutesBuilder(new CassandraRoute(latch, batchSize, configurationBuilder, client)); main.start(); } } diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java index 0ad7e275..b3c24e40 100644 --- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java +++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java @@ -85,9 +85,9 @@ public class LargeFileRouteBuilder extends RouteBuilder { public void configure() { producerTemplate = getContext().createProducerTemplate(); - getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache); - - final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder(); + final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil + .getDefaultKafkaResumeStrategyConfigurationBuilder() + .withResumeCache(cache); from("file:{{input.dir}}?noop=true&fileName={{input.file}}") .routeId("largeFileRoute") diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java index c85b2ca3..4482171e 100644 --- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java +++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java @@ -63,10 +63,8 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder { * Let's configure the Camel routing rules using Java code... */ public void configure() { - getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache); - from("file:{{input.dir}}?noop=true&recursive=true") - .resumable().configuration(resumeStrategyConfigurationBuilder) + .resumable().configuration(resumeStrategyConfigurationBuilder.withResumeCache(cache)) .process(this::process) .to("file:{{output.dir}}"); } diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java index 407c9105..4c57da1f 100644 --- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java +++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java @@ -49,9 +49,9 @@ public class ClusterizedLargeDirectoryRouteBuilder extends RouteBuilder { * Let's configure the Camel routing rules using Java code... */ public void configure() { - getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, new CaffeineCache<>(10000)); - - final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder(); + final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil + .getDefaultKafkaResumeStrategyConfigurationBuilder() + .withResumeCache(new CaffeineCache<>(10000)); from("timer:heartbeat?period=10000") .routeId("heartbeat")