This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 54c94a9  [fix](cdc) add Oracle table name validation (#320)
54c94a9 is described below

commit 54c94a9e19cca392a622674af5144465413e973d
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Mon Feb 26 15:36:28 2024 +0800

    [fix](cdc) add Oracle table name validation (#320)
---
 .../org/apache/doris/flink/catalog/doris/TableSchema.java     |  1 +
 .../doris/flink/tools/cdc/oracle/OracleDatabaseSync.java      | 11 +++++++++++
 .../flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java      |  2 +-
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index f3da962..4cc9098 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 public class TableSchema {
+    public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
     private String database;
     private String table;
     private String tableComment;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index ef2e7ac..8ca66e4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -32,6 +32,8 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.CreateTableException;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.slf4j.Logger;
@@ -118,6 +120,15 @@ public class OracleDatabaseSync extends DatabaseSync {
                     if (!isSyncNeeded(tableName)) {
                         continue;
                     }
+                    // Oracle allows table names to contain special characters 
such as /, #, $,
+                    // etc., as in 'A/B'.
+                    // However, Doris does not support tables with these 
characters.
+                    if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
+                        throw new CreateTableException(
+                                String.format(
+                                        "The table name %s is invalid. Table 
names in Doris must match the regex pattern %s. Please consider renaming the 
table or use the 'excluding-tables' option to filter it out.",
+                                        tableName, 
TableSchema.DORIS_TABLE_REGEX));
+                    }
                     SourceSchema sourceSchema =
                             new OracleSchema(
                                     metaData, databaseName, schemaName, 
tableName, tableComment);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index da96f08..f4d6ba3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -125,7 +125,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
         String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
         String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
         Preconditions.checkNotNull(databaseName, "database-name in sqlserver 
is required");
-        Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is 
required");
+        Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is 
required");
 
         String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
         String hostname = config.get(JdbcSourceOptions.HOSTNAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to