This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c2e988  CAMEL-14777: Replace Debezium EmbeddedEngine instance with 
DebeziumEngine APIs (#3965)
6c2e988 is described below

commit 6c2e9881e085285c8ae289cab38c1956b46140a5
Author: Omar Al-Safi <omars...@gmail.com>
AuthorDate: Wed Jul 1 17:43:27 2020 +0200

    CAMEL-14777: Replace Debezium EmbeddedEngine instance with DebeziumEngine 
APIs (#3965)
---
 .../camel/component/debezium/DebeziumConsumer.java   | 20 ++++++++++++--------
 components/camel-debezium-common/pom.xml             |  5 +++++
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
index b971dab..f73ed98 100644
--- 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
+++ 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
@@ -18,7 +18,9 @@ package org.apache.camel.component.debezium;
 
 import java.util.concurrent.ExecutorService;
 
-import io.debezium.embedded.EmbeddedEngine;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import 
org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
@@ -31,7 +33,7 @@ public class DebeziumConsumer extends DefaultConsumer {
     private final EmbeddedDebeziumConfiguration configuration;
 
     private ExecutorService executorService;
-    private EmbeddedEngine dbzEngine;
+    private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> dbzEngine;
 
     public DebeziumConsumer(DebeziumEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -55,7 +57,7 @@ public class DebeziumConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
-        dbzEngine.stop();
+        dbzEngine.close();
 
         // shutdown the thread pool gracefully
         
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
@@ -64,13 +66,15 @@ public class DebeziumConsumer extends DefaultConsumer {
         super.doStop();
     }
 
-    private EmbeddedEngine createDbzEngine() {
-        return 
EmbeddedEngine.create().using(configuration.createDebeziumConfiguration())
-            .notifying(this::onEventListener).build();
+    private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> 
createDbzEngine() {
+        return DebeziumEngine.create(Connect.class)
+                
.using(configuration.createDebeziumConfiguration().asProperties())
+                .notifying(this::onEventListener)
+                .build();
     }
 
-    private void onEventListener(final SourceRecord event) {
-        final Exchange exchange = endpoint.createDbzExchange(event);
+    private void onEventListener(final ChangeEvent<SourceRecord, SourceRecord> 
event) {
+        final Exchange exchange = endpoint.createDbzExchange(event.value());
 
         try {
             // send message to next processor in the route
diff --git a/components/camel-debezium-common/pom.xml 
b/components/camel-debezium-common/pom.xml
index b2263ef..8a0a9b7 100644
--- a/components/camel-debezium-common/pom.xml
+++ b/components/camel-debezium-common/pom.xml
@@ -48,6 +48,11 @@
         <!-- debezium embedded engine -->
         <dependency>
             <groupId>io.debezium</groupId>
+            <artifactId>debezium-api</artifactId>
+            <version>${debezium-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
             <artifactId>debezium-embedded</artifactId>
             <version>${debezium-version}</version>
             <exclusions>

Reply via email to