This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7a763e0 [improve](cdc)improve compatibility with other databases by introducing an option to synchronize default values when inserting null data. (#361) 7a763e0 is described below commit 7a763e0f97e05cad4509ea259fdc99a186d3017b Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Apr 9 11:32:28 2024 +0800 [improve](cdc)improve compatibility with other databases by introducing an option to synchronize default values when inserting null data. (#361) --- .../doris/flink/tools/cdc/oracle/OracleDatabaseSync.java | 12 +++++++++--- .../flink/tools/cdc/postgres/PostgresDatabaseSync.java | 12 +++++++++--- .../flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java | 14 +++++++++++--- .../doris/flink/tools/cdc/sqlserver/SqlServerType.java | 2 ++ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 945c839..89214fd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -28,10 +28,12 @@ import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -175,9 +177,13 @@ public class OracleDatabaseSync extends DatabaseSync { } } - Map<String, Object> customConverterConfigs = new HashMap<>(); - JsonDebeziumDeserializationSchema schema = - new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + DebeziumDeserializationSchema<String> schema; + if (ignoreDefaultValue) { + schema = new DorisJsonDebeziumDeserializationSchema(); + } else { + Map<String, Object> customConverterConfigs = new HashMap<>(); + schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + } if (config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) { JdbcIncrementalSource<String> incrSource = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 2645d83..490fdbc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -28,10 +28,12 @@ import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -160,9 +162,13 @@ public class PostgresDatabaseSync extends DatabaseSync { } } - Map<String, Object> customConverterConfigs = new HashMap<>(); - JsonDebeziumDeserializationSchema schema = - new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + DebeziumDeserializationSchema<String> schema; + if (ignoreDefaultValue) { + schema = new DorisJsonDebeziumDeserializationSchema(); + } else { + Map<String, Object> customConverterConfigs = new HashMap<>(); + schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + } if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) { JdbcIncrementalSource<String> incrSource = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index f4d6ba3..9f286ff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -26,12 +26,15 @@ import com.ververica.cdc.connectors.base.options.JdbcSourceOptions; import com.ververica.cdc.connectors.base.options.SourceOptions; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.connectors.sqlserver.SqlServerSource; import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -155,9 +158,14 @@ public class SqlServerDatabaseSync extends DatabaseSync { } } - Map<String, Object> customConverterConfigs = new HashMap<>(); - JsonDebeziumDeserializationSchema schema = - new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + DebeziumDeserializationSchema<String> schema; + if (ignoreDefaultValue) { + schema = new DorisJsonDebeziumDeserializationSchema(); + } else { + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + } if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) { JdbcIncrementalSource<String> incrSource = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java index 6c92ae4..ff37c06 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java @@ -41,6 +41,7 @@ public class SqlServerType { private static final String NVARCHAR = "nvarchar"; private static final String TEXT = "text"; private static final String NTEXT = "ntext"; + private static final String XML = "xml"; private static final String UNIQUEIDENTIFIER = "uniqueidentifier"; private static final String TIME = "time"; private static final String TIMESTAMP = "timestamp"; @@ -104,6 +105,7 @@ public class SqlServerType { case UNIQUEIDENTIFIER: case BINARY: case VARBINARY: + case XML: return DorisType.STRING; default: throw new UnsupportedOperationException( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org