This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a763e0  [improve](cdc)improve compatibility with other databases by 
introducing an option to synchronize default values when inserting null data. 
(#361)
7a763e0 is described below

commit 7a763e0f97e05cad4509ea259fdc99a186d3017b
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Apr 9 11:32:28 2024 +0800

    [improve](cdc)improve compatibility with other databases by introducing an 
option to synchronize default values when inserting null data. (#361)
---
 .../doris/flink/tools/cdc/oracle/OracleDatabaseSync.java   | 12 +++++++++---
 .../flink/tools/cdc/postgres/PostgresDatabaseSync.java     | 12 +++++++++---
 .../flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java   | 14 +++++++++++---
 .../doris/flink/tools/cdc/sqlserver/SqlServerType.java     |  2 ++
 4 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 945c839..89214fd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -28,10 +28,12 @@ import 
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
 import com.ververica.cdc.connectors.oracle.OracleSource;
 import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
 import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.doris.flink.catalog.doris.DataModel;
+import 
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -175,9 +177,13 @@ public class OracleDatabaseSync extends DatabaseSync {
             }
         }
 
-        Map<String, Object> customConverterConfigs = new HashMap<>();
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        DebeziumDeserializationSchema<String> schema;
+        if (ignoreDefaultValue) {
+            schema = new DorisJsonDebeziumDeserializationSchema();
+        } else {
+            Map<String, Object> customConverterConfigs = new HashMap<>();
+            schema = new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        }
 
         if 
(config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)) {
             JdbcIncrementalSource<String> incrSource =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 2645d83..490fdbc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -28,10 +28,12 @@ import 
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
 import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
 import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
 import 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.doris.flink.catalog.doris.DataModel;
+import 
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -160,9 +162,13 @@ public class PostgresDatabaseSync extends DatabaseSync {
             }
         }
 
-        Map<String, Object> customConverterConfigs = new HashMap<>();
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        DebeziumDeserializationSchema<String> schema;
+        if (ignoreDefaultValue) {
+            schema = new DorisJsonDebeziumDeserializationSchema();
+        } else {
+            Map<String, Object> customConverterConfigs = new HashMap<>();
+            schema = new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        }
 
         if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)) {
             JdbcIncrementalSource<String> incrSource =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index f4d6ba3..9f286ff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -26,12 +26,15 @@ import 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
 import com.ververica.cdc.connectors.base.options.SourceOptions;
 import com.ververica.cdc.connectors.base.options.StartupOptions;
 import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
 import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
 import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.doris.flink.catalog.doris.DataModel;
+import 
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -155,9 +158,14 @@ public class SqlServerDatabaseSync extends DatabaseSync {
             }
         }
 
-        Map<String, Object> customConverterConfigs = new HashMap<>();
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        DebeziumDeserializationSchema<String> schema;
+        if (ignoreDefaultValue) {
+            schema = new DorisJsonDebeziumDeserializationSchema();
+        } else {
+            Map<String, Object> customConverterConfigs = new HashMap<>();
+            
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+            schema = new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        }
 
         if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)) {
             JdbcIncrementalSource<String> incrSource =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
index 6c92ae4..ff37c06 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -41,6 +41,7 @@ public class SqlServerType {
     private static final String NVARCHAR = "nvarchar";
     private static final String TEXT = "text";
     private static final String NTEXT = "ntext";
+    private static final String XML = "xml";
     private static final String UNIQUEIDENTIFIER = "uniqueidentifier";
     private static final String TIME = "time";
     private static final String TIMESTAMP = "timestamp";
@@ -104,6 +105,7 @@ public class SqlServerType {
             case UNIQUEIDENTIFIER:
             case BINARY:
             case VARBINARY:
+            case XML:
                 return DorisType.STRING;
             default:
                 throw new UnsupportedOperationException(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to