This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a8dc096  [fix]Fix using debezium as kafka upstream data source (#11)
a8dc096 is described below

commit a8dc096af4bc381481b5ea6411bc7fd3a98cab8c
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Apr 10 16:00:15 2024 +0800

    [fix]Fix using debezium as kafka upstream data source (#11)
---
 .../doris/kafka/connector/cfg/DorisOptions.java    | 11 +++
 .../connector/cfg/DorisSinkConnectorConfig.java    |  3 +
 .../kafka/connector/converter/ConverterMode.java   | 41 +++++++++
 .../kafka/connector/converter/RecordService.java   | 35 +++++---
 .../doris/kafka/connector/writer/DorisWriter.java  |  2 +-
 .../connector/converter/TestRecordService.java     | 99 ++++++++++++++++------
 6 files changed, 149 insertions(+), 42 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 528aaa3..c47ea1f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
+import org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -59,6 +60,7 @@ public class DorisOptions {
     private String labelPrefix;
     private LoadModel loadModel;
     private DeliveryGuarantee deliveryGuarantee;
+    private ConverterMode converterMode;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -79,6 +81,11 @@ public class DorisOptions {
                         config.getOrDefault(
                                 DorisSinkConnectorConfig.DELIVERY_GUARANTEE,
                                 
DorisSinkConnectorConfig.DELIVERY_GUARANTEE_DEFAULT));
+        this.converterMode =
+                ConverterMode.of(
+                        config.getOrDefault(
+                                DorisSinkConnectorConfig.CONVERT_MODE,
+                                
DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT));
 
         this.fileSize = 
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
         this.recordNum =
@@ -260,6 +267,10 @@ public class DorisOptions {
         return this.deliveryGuarantee;
     }
 
+    public ConverterMode getConverterMode() {
+        return this.converterMode;
+    }
+
     public boolean isAutoRedirect() {
         return autoRedirect;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index ebc2d1f..5ca55ef 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -22,6 +22,7 @@ package org.apache.doris.kafka.connector.cfg;
 import java.time.Duration;
 import java.util.Map;
 import org.apache.doris.kafka.connector.DorisSinkConnector;
+import org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -73,6 +74,8 @@ public class DorisSinkConnectorConfig {
     public static final String AUTO_REDIRECT = "auto.redirect";
     public static final String DELIVERY_GUARANTEE = "delivery.guarantee";
     public static final String DELIVERY_GUARANTEE_DEFAULT = 
DeliveryGuarantee.AT_LEAST_ONCE.name();
+    public static final String CONVERT_MODE = "converter.mode";
+    public static final String CONVERT_MODE_DEFAULT = 
ConverterMode.NORMAL.getName();
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java
new file mode 100644
index 0000000..c40f789
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter;
+
+public enum ConverterMode {
+    NORMAL("normal"),
+
+    // kafka upstream data comes from debezium
+    DEBEZIUM_INGESTION("debezium_ingestion");
+
+    private final String name;
+
+    ConverterMode(String name) {
+        this.name = name;
+    }
+
+    public static ConverterMode of(String name) {
+        return ConverterMode.valueOf(name.toUpperCase());
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index d678d33..bd55297 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.exception.DataFormatException;
 import org.apache.doris.kafka.connector.writer.LoadConstants;
 import org.apache.doris.kafka.connector.writer.RecordBuffer;
@@ -41,6 +42,7 @@ public class RecordService {
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordService.class);
     private static final ObjectMapper MAPPER = new ObjectMapper();
     private final JsonConverter converter;
+    private DorisOptions dorisOptions;
 
     public RecordService() {
         this.converter = new JsonConverter();
@@ -49,6 +51,11 @@ public class RecordService {
         this.converter.configure(converterConfig, false);
     }
 
+    public RecordService(DorisOptions dorisOptions) {
+        this();
+        this.dorisOptions = dorisOptions;
+    }
+
     /**
      * process struct record from debezium: { "schema": { "type": "struct", 
"fields": [ ...... ],
      * "optional": false, "name": "" }, "payload": { "name": "doris", 
"__deleted": "true" } }
@@ -57,20 +64,22 @@ public class RecordService {
         byte[] bytes =
                 converter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
         String recordValue = new String(bytes, StandardCharsets.UTF_8);
-        try {
-            // delete sign sync
-            Map<String, Object> recordMap =
-                    MAPPER.readValue(recordValue, new 
TypeReference<Map<String, Object>>() {});
-            if (Boolean.parseBoolean(
-                    recordMap.getOrDefault(LoadConstants.DELETE_KET, 
false).toString())) {
-                recordMap.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_TRUE);
-            } else {
-                recordMap.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_FALSE);
+        if (ConverterMode.DEBEZIUM_INGESTION == 
dorisOptions.getConverterMode()) {
+            try {
+                Map<String, Object> recordMap =
+                        MAPPER.readValue(recordValue, new 
TypeReference<Map<String, Object>>() {});
+                // delete sign sync
+                if ("d".equals(recordMap.get("op"))) {
+                    Map<String, Object> beforeValue = (Map<String, Object>) 
recordMap.get("before");
+                    beforeValue.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_TRUE);
+                    return MAPPER.writeValueAsString(beforeValue);
+                }
+                Map<String, Object> afterValue = (Map<String, Object>) 
recordMap.get("after");
+                afterValue.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_FALSE);
+                return MAPPER.writeValueAsString(afterValue);
+            } catch (JsonProcessingException e) {
+                LOG.error("parse record failed, cause by parse json error: 
{}", recordValue);
             }
-            recordMap.remove(LoadConstants.DELETE_KET);
-            recordValue = MAPPER.writeValueAsString(recordMap);
-        } catch (JsonProcessingException e) {
-            LOG.error("Add delete sign failed, cause by parse json error: {}", 
recordValue);
         }
         return recordValue;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 9fcffbc..29cf1ec 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -93,7 +93,7 @@ public abstract class DorisWriter {
 
         this.dorisOptions = dorisOptions;
         this.connectionProvider = connectionProvider;
-        this.recordService = new RecordService();
+        this.recordService = new RecordService(dorisOptions);
         this.connectMonitor = connectMonitor;
     }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 3de9c27..0804980 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -22,59 +22,102 @@ package org.apache.doris.kafka.connector.converter;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.writer.TestRecordBuffer;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestRecordService {
 
-    private RecordService recordService = new RecordService();
+    private RecordService recordService;
+    private Properties props = new Properties();
+    private JsonConverter jsonConverter = new JsonConverter();
 
-    @Test
-    public void processStructRecord() {
-        JsonConverter jsonConverter = new JsonConverter();
+    @Before
+    public void init() throws IOException {
+        InputStream stream =
+                this.getClass()
+                        .getClassLoader()
+                        
.getResourceAsStream("doris-connector-sink.properties");
+        props.load(stream);
+        props.put("task_id", "1");
+        props.put("converter.mode", "debezium_ingestion");
+        recordService = new RecordService(new DorisOptions((Map) props));
         HashMap<String, String> config = new HashMap<>();
         jsonConverter.configure(config, false);
+    }
+
+    @Test
+    public void processStructRecord() throws IOException {
+        props.remove("converter.mode");
+        recordService = new RecordService(new DorisOptions((Map) props));
+        String topic = "normal.wdl_test.test_sink_normal";
+
         // no delete value
-        String nodelete =
-                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1,\"__deleted\":\"false\"}}";
-        SchemaAndValue nodeleteValue =
-                jsonConverter.toConnectData("test", 
nodelete.getBytes(StandardCharsets.UTF_8));
-        SinkRecord record =
-                TestRecordBuffer.newSinkRecord(nodeleteValue.value(), 1, 
nodeleteValue.schema());
-        String s = recordService.processStructRecord(record);
-        
Assert.assertEquals("{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"0\"}",
 s);
+        String noDeleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
+        SchemaAndValue noDeleteSchemaValue =
+                jsonConverter.toConnectData(topic, 
noDeleteValue.getBytes(StandardCharsets.UTF_8));
+        SinkRecord noDeleteSinkRecord =
+                TestRecordBuffer.newSinkRecord(
+                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
+        String noDeleteResult = 
recordService.processStructRecord(noDeleteSinkRecord);
+        Assert.assertEquals(
+                
"{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}",
+                noDeleteResult);
 
         // delete value
-        String delete =
-                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1,\"__deleted\":\"true\"}}";
-        SchemaAndValue deleteValue =
-                jsonConverter.toConnectData("test", 
delete.getBytes(StandardCharsets.UTF_8));
+        String deleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
+        SchemaAndValue deleteSchemaValue =
+                jsonConverter.toConnectData(topic, 
deleteValue.getBytes(StandardCharsets.UTF_8));
         SinkRecord record2 =
-                TestRecordBuffer.newSinkRecord(deleteValue.value(), 1, 
deleteValue.schema());
+                TestRecordBuffer.newSinkRecord(
+                        deleteSchemaValue.value(), 1, 
deleteSchemaValue.schema());
         String s2 = recordService.processStructRecord(record2);
         Assert.assertEquals(
-                
"{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"1\"}", s2);
-
-        // no setting delete sign, not set 
transforms.unwrap.delete.handling.mode=rewrite
-        String deletenone =
-                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1}}";
-        SchemaAndValue deletenoneValue =
-                jsonConverter.toConnectData("test", 
deletenone.getBytes(StandardCharsets.UTF_8));
-        SinkRecord record3 =
+                
"{\"before\":{\"id\":24,\"name\":\"bb\"},\"after\":null,\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712545844000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5627,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"d\",\"ts_ms\":1712545844948,\"transaction\":null}",
+                s2);
+    }
+
+    @Test
+    public void processStructRecordWithDebeziumSchema() {
+        String topic = "normal.wdl_test.test_sink_normal";
+
+        // no delete value
+        String noDeleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
+        SchemaAndValue noDeleteSchemaValue =
+                jsonConverter.toConnectData(topic, 
noDeleteValue.getBytes(StandardCharsets.UTF_8));
+        SinkRecord noDeleteSinkRecord =
                 TestRecordBuffer.newSinkRecord(
-                        deletenoneValue.value(), 1, deletenoneValue.schema());
-        String s3 = recordService.processStructRecord(record3);
+                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
+        String noDeleteResult = 
recordService.processStructRecord(noDeleteSinkRecord);
         Assert.assertEquals(
-                
"{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"0\"}", s3);
+                
"{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}", noDeleteResult);
+
+        // delete value
+        String deleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
+        SchemaAndValue deleteSchemaValue =
+                jsonConverter.toConnectData(topic, 
deleteValue.getBytes(StandardCharsets.UTF_8));
+        SinkRecord record2 =
+                TestRecordBuffer.newSinkRecord(
+                        deleteSchemaValue.value(), 1, 
deleteSchemaValue.schema());
+        String s2 = recordService.processStructRecord(record2);
+        
Assert.assertEquals("{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}",
 s2);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to