This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 54c94a9 [fix](cdc) add Oracle table name validation (#320) 54c94a9 is described below commit 54c94a9e19cca392a622674af5144465413e973d Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Feb 26 15:36:28 2024 +0800 [fix](cdc) add Oracle table name validation (#320) --- .../org/apache/doris/flink/catalog/doris/TableSchema.java | 1 + .../doris/flink/tools/cdc/oracle/OracleDatabaseSync.java | 11 +++++++++++ .../flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index f3da962..4cc9098 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; public class TableSchema { + public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$"; private String database; private String table; private String tableComment; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index ef2e7ac..8ca66e4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -32,6 +32,8 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -118,6 +120,15 @@ public class OracleDatabaseSync extends DatabaseSync { if (!isSyncNeeded(tableName)) { continue; } + // Oracle allows table names to contain special characters such as /, #, $, + // etc., as in 'A/B'. + // However, Doris does not support tables with these characters. + if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) { + throw new CreateTableException( + String.format( + "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.", + tableName, TableSchema.DORIS_TABLE_REGEX)); + } SourceSchema sourceSchema = new OracleSchema( metaData, databaseName, schemaName, tableName, tableComment); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index da96f08..f4d6ba3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -125,7 +125,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required"); - Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required"); + Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is required"); String tableName = config.get(JdbcSourceOptions.TABLE_NAME); String hostname = config.get(JdbcSourceOptions.HOSTNAME); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org