This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 6c2e988 CAMEL-14777: Replace Debezium EmbeddedEngine instance with DebeziumEngine APIs (#3965) 6c2e988 is described below commit 6c2e9881e085285c8ae289cab38c1956b46140a5 Author: Omar Al-Safi <omars...@gmail.com> AuthorDate: Wed Jul 1 17:43:27 2020 +0200 CAMEL-14777: Replace Debezium EmbeddedEngine instance with DebeziumEngine APIs (#3965) --- .../camel/component/debezium/DebeziumConsumer.java | 20 ++++++++++++-------- components/camel-debezium-common/pom.xml | 5 +++++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java index b971dab..f73ed98 100644 --- a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java +++ b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java @@ -18,7 +18,9 @@ package org.apache.camel.component.debezium; import java.util.concurrent.ExecutorService; -import io.debezium.embedded.EmbeddedEngine; +import io.debezium.embedded.Connect; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration; @@ -31,7 +33,7 @@ public class DebeziumConsumer extends DefaultConsumer { private final EmbeddedDebeziumConfiguration configuration; private ExecutorService executorService; - private EmbeddedEngine dbzEngine; + private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> dbzEngine; public DebeziumConsumer(DebeziumEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -55,7 +57,7 @@ public class DebeziumConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { - dbzEngine.stop(); + dbzEngine.close(); // shutdown the thread pool gracefully getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); @@ -64,13 +66,15 @@ public class DebeziumConsumer extends DefaultConsumer { super.doStop(); } - private EmbeddedEngine createDbzEngine() { - return EmbeddedEngine.create().using(configuration.createDebeziumConfiguration()) - .notifying(this::onEventListener).build(); + private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> createDbzEngine() { + return DebeziumEngine.create(Connect.class) + .using(configuration.createDebeziumConfiguration().asProperties()) + .notifying(this::onEventListener) + .build(); } - private void onEventListener(final SourceRecord event) { - final Exchange exchange = endpoint.createDbzExchange(event); + private void onEventListener(final ChangeEvent<SourceRecord, SourceRecord> event) { + final Exchange exchange = endpoint.createDbzExchange(event.value()); try { // send message to next processor in the route diff --git a/components/camel-debezium-common/pom.xml b/components/camel-debezium-common/pom.xml index b2263ef..8a0a9b7 100644 --- a/components/camel-debezium-common/pom.xml +++ b/components/camel-debezium-common/pom.xml @@ -48,6 +48,11 @@ <!-- debezium embedded engine --> <dependency> <groupId>io.debezium</groupId> + <artifactId>debezium-api</artifactId> + <version>${debezium-version}</version> + </dependency> + <dependency> + <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${debezium-version}</version> <exclusions>