This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new a8dc096 [fix]Fix using debezium as kafka upstream data source (#11) a8dc096 is described below commit a8dc096af4bc381481b5ea6411bc7fd3a98cab8c Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Apr 10 16:00:15 2024 +0800 [fix]Fix using debezium as kafka upstream data source (#11) --- .../doris/kafka/connector/cfg/DorisOptions.java | 11 +++ .../connector/cfg/DorisSinkConnectorConfig.java | 3 + .../kafka/connector/converter/ConverterMode.java | 41 +++++++++ .../kafka/connector/converter/RecordService.java | 35 +++++--- .../doris/kafka/connector/writer/DorisWriter.java | 2 +- .../connector/converter/TestRecordService.java | 99 ++++++++++++++++------ 6 files changed, 149 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 528aaa3..c47ea1f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import org.apache.doris.kafka.connector.converter.ConverterMode; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -59,6 +60,7 @@ public class DorisOptions { private String labelPrefix; private LoadModel loadModel; private DeliveryGuarantee deliveryGuarantee; + private ConverterMode converterMode; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -79,6 +81,11 @@ public class DorisOptions { config.getOrDefault( DorisSinkConnectorConfig.DELIVERY_GUARANTEE, DorisSinkConnectorConfig.DELIVERY_GUARANTEE_DEFAULT)); + this.converterMode = + ConverterMode.of( + config.getOrDefault( + DorisSinkConnectorConfig.CONVERT_MODE, + DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT)); this.fileSize = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES)); this.recordNum = @@ -260,6 +267,10 @@ public class DorisOptions { return this.deliveryGuarantee; } + public ConverterMode getConverterMode() { + return this.converterMode; + } + public boolean isAutoRedirect() { return autoRedirect; } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index ebc2d1f..5ca55ef 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -22,6 +22,7 @@ package org.apache.doris.kafka.connector.cfg; import java.time.Duration; import java.util.Map; import org.apache.doris.kafka.connector.DorisSinkConnector; +import org.apache.doris.kafka.connector.converter.ConverterMode; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -73,6 +74,8 @@ public class DorisSinkConnectorConfig { public static final String AUTO_REDIRECT = "auto.redirect"; public static final String DELIVERY_GUARANTEE = "delivery.guarantee"; public static final String DELIVERY_GUARANTEE_DEFAULT = DeliveryGuarantee.AT_LEAST_ONCE.name(); + public static final String CONVERT_MODE = "converter.mode"; + public static final String CONVERT_MODE_DEFAULT = ConverterMode.NORMAL.getName(); // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java new file mode 100644 index 0000000..c40f789 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.converter; + +public enum ConverterMode { + NORMAL("normal"), + + // kafka upstream data comes from debezium + DEBEZIUM_INGESTION("debezium_ingestion"); + + private final String name; + + ConverterMode(String name) { + this.name = name; + } + + public static ConverterMode of(String name) { + return ConverterMode.valueOf(name.toUpperCase()); + } + + public String getName() { + return name; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index d678d33..bd55297 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringJoiner; +import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.exception.DataFormatException; import org.apache.doris.kafka.connector.writer.LoadConstants; import org.apache.doris.kafka.connector.writer.RecordBuffer; @@ -41,6 +42,7 @@ public class RecordService { private static final Logger LOG = LoggerFactory.getLogger(RecordService.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final JsonConverter converter; + private DorisOptions dorisOptions; public RecordService() { this.converter = new JsonConverter(); @@ -49,6 +51,11 @@ public class RecordService { this.converter.configure(converterConfig, false); } + public RecordService(DorisOptions dorisOptions) { + this(); + this.dorisOptions = dorisOptions; + } + /** * process struct record from debezium: { "schema": { "type": "struct", "fields": [ ...... ], * "optional": false, "name": "" }, "payload": { "name": "doris", "__deleted": "true" } } @@ -57,20 +64,22 @@ public class RecordService { byte[] bytes = converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); String recordValue = new String(bytes, StandardCharsets.UTF_8); - try { - // delete sign sync - Map<String, Object> recordMap = - MAPPER.readValue(recordValue, new TypeReference<Map<String, Object>>() {}); - if (Boolean.parseBoolean( - recordMap.getOrDefault(LoadConstants.DELETE_KET, false).toString())) { - recordMap.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_TRUE); - } else { - recordMap.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_FALSE); + if (ConverterMode.DEBEZIUM_INGESTION == dorisOptions.getConverterMode()) { + try { + Map<String, Object> recordMap = + MAPPER.readValue(recordValue, new TypeReference<Map<String, Object>>() {}); + // delete sign sync + if ("d".equals(recordMap.get("op"))) { + Map<String, Object> beforeValue = (Map<String, Object>) recordMap.get("before"); + beforeValue.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_TRUE); + return MAPPER.writeValueAsString(beforeValue); + } + Map<String, Object> afterValue = (Map<String, Object>) recordMap.get("after"); + afterValue.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_FALSE); + return MAPPER.writeValueAsString(afterValue); + } catch (JsonProcessingException e) { + LOG.error("parse record failed, cause by parse json error: {}", recordValue); } - recordMap.remove(LoadConstants.DELETE_KET); - recordValue = MAPPER.writeValueAsString(recordMap); - } catch (JsonProcessingException e) { - LOG.error("Add delete sign failed, cause by parse json error: {}", recordValue); } return recordValue; } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java index 9fcffbc..29cf1ec 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java @@ -93,7 +93,7 @@ public abstract class DorisWriter { this.dorisOptions = dorisOptions; this.connectionProvider = connectionProvider; - this.recordService = new RecordService(); + this.recordService = new RecordService(dorisOptions); this.connectMonitor = connectMonitor; } diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index 3de9c27..0804980 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -22,59 +22,102 @@ package org.apache.doris.kafka.connector.converter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.writer.TestRecordBuffer; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestRecordService { - private RecordService recordService = new RecordService(); + private RecordService recordService; + private Properties props = new Properties(); + private JsonConverter jsonConverter = new JsonConverter(); - @Test - public void processStructRecord() { - JsonConverter jsonConverter = new JsonConverter(); + @Before + public void init() throws IOException { + InputStream stream = + this.getClass() + .getClassLoader() + .getResourceAsStream("doris-connector-sink.properties"); + props.load(stream); + props.put("task_id", "1"); + props.put("converter.mode", "debezium_ingestion"); + recordService = new RecordService(new DorisOptions((Map) props)); HashMap<String, String> config = new HashMap<>(); jsonConverter.configure(config, false); + } + + @Test + public void processStructRecord() throws IOException { + props.remove("converter.mode"); + recordService = new RecordService(new DorisOptions((Map) props)); + String topic = "normal.wdl_test.test_sink_normal"; + // no delete value - String nodelete = - "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1,\"__deleted\":\"false\"}}"; - SchemaAndValue nodeleteValue = - jsonConverter.toConnectData("test", nodelete.getBytes(StandardCharsets.UTF_8)); - SinkRecord record = - TestRecordBuffer.newSinkRecord(nodeleteValue.value(), 1, nodeleteValue.schema()); - String s = recordService.processStructRecord(record); - Assert.assertEquals("{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"0\"}", s); + String noDeleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] + SchemaAndValue noDeleteSchemaValue = + jsonConverter.toConnectData(topic, noDeleteValue.getBytes(StandardCharsets.UTF_8)); + SinkRecord noDeleteSinkRecord = + TestRecordBuffer.newSinkRecord( + noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); + String noDeleteResult = recordService.processStructRecord(noDeleteSinkRecord); + Assert.assertEquals( + "{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}", + noDeleteResult); // delete value - String delete = - "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1,\"__deleted\":\"true\"}}"; - SchemaAndValue deleteValue = - jsonConverter.toConnectData("test", delete.getBytes(StandardCharsets.UTF_8)); + String deleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] + SchemaAndValue deleteSchemaValue = + jsonConverter.toConnectData(topic, deleteValue.getBytes(StandardCharsets.UTF_8)); SinkRecord record2 = - TestRecordBuffer.newSinkRecord(deleteValue.value(), 1, deleteValue.schema()); + TestRecordBuffer.newSinkRecord( + deleteSchemaValue.value(), 1, deleteSchemaValue.schema()); String s2 = recordService.processStructRecord(record2); Assert.assertEquals( - "{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"1\"}", s2); - - // no setting delete sign, not set transforms.unwrap.delete.handling.mode=rewrite - String deletenone = - "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"}],\"optional\":false,\"name\":\"mysql_master.test.test_flink.Value\"},\"payload\":{\"name\":\"zhangsan\",\"age\":1}}"; - SchemaAndValue deletenoneValue = - jsonConverter.toConnectData("test", deletenone.getBytes(StandardCharsets.UTF_8)); - SinkRecord record3 = + "{\"before\":{\"id\":24,\"name\":\"bb\"},\"after\":null,\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712545844000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5627,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"d\",\"ts_ms\":1712545844948,\"transaction\":null}", + s2); + } + + @Test + public void processStructRecordWithDebeziumSchema() { + String topic = "normal.wdl_test.test_sink_normal"; + + // no delete value + String noDeleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] + SchemaAndValue noDeleteSchemaValue = + jsonConverter.toConnectData(topic, noDeleteValue.getBytes(StandardCharsets.UTF_8)); + SinkRecord noDeleteSinkRecord = TestRecordBuffer.newSinkRecord( - deletenoneValue.value(), 1, deletenoneValue.schema()); - String s3 = recordService.processStructRecord(record3); + noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); + String noDeleteResult = recordService.processStructRecord(noDeleteSinkRecord); Assert.assertEquals( - "{\"name\":\"zhangsan\",\"age\":1,\"__DORIS_DELETE_SIGN__\":\"0\"}", s3); + "{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}", noDeleteResult); + + // delete value + String deleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] + SchemaAndValue deleteSchemaValue = + jsonConverter.toConnectData(topic, deleteValue.getBytes(StandardCharsets.UTF_8)); + SinkRecord record2 = + TestRecordBuffer.newSinkRecord( + deleteSchemaValue.value(), 1, deleteSchemaValue.schema()); + String s2 = recordService.processStructRecord(record2); + Assert.assertEquals("{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}", s2); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org