This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new da648ac  [fix](oracle) add oracle regexp like compatible (#194)
da648ac is described below

commit da648ac4c1a53bb5769b1fc193e054f9f880a62a
Author: wudi <676366...@qq.com>
AuthorDate: Thu Sep 14 15:53:59 2023 +0800

    [fix](oracle) add oracle regexp like compatible (#194)
    
    When include-tables has too many table names, and debezium incrementally 
reads, it will be judged based on `regexp_like`. When the regular length 
exceeds 512, an error will be reported, like `ORA-12733: regular expression too 
long`
---
 .../main/java/org/apache/doris/flink/lookup/ExecutionPool.java   | 4 ++--
 .../main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 4 ++++
 .../apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java  | 9 ++++++++-
 3 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
index 9b930ff..94cb35c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -104,9 +104,9 @@ public class ExecutionPool implements Closeable {
     public void close() throws IOException {
         if (started.compareAndSet(true, false)) {
             LOG.info("close executorService");
-            actionWatcherExecutorService.shutdownNow();
+            actionWatcherExecutorService.shutdown();
+            workerExecutorService.shutdown();
             workerStated.set(false);
-            workerExecutorService.shutdownNow();
             this.actionWatcherExecutorService = null;
             this.workerExecutorService = null;
             this.semaphore = null;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 455000e..e36590c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -61,6 +61,8 @@ public abstract class DatabaseSync {
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
     private boolean newSchemaChange;
+    protected String includingTables;
+    protected String excludingTables;
 
     public abstract Connection getConnection() throws SQLException;
 
@@ -76,6 +78,8 @@ public abstract class DatabaseSync {
         this.config = config;
         this.database = database;
         this.converter = new TableNameConverter(tablePrefix, tableSuffix);
+        this.includingTables = includingTables;
+        this.excludingTables = excludingTables;
         this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
         this.ignoreDefaultValue = ignoreDefaultValue;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 3b4bc31..bfd974f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -107,11 +107,19 @@ public class OracleDatabaseSync extends DatabaseSync {
 
     @Override
     public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment 
env) {
+        Properties debeziumProperties = new Properties();
         String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
         String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
         Preconditions.checkNotNull(databaseName, "database-name in oracle is 
required");
         Preconditions.checkNotNull(schemaName, "schema-name in oracle is 
required");
         String tableName = config.get(OracleSourceOptions.TABLE_NAME);
+        //When debezium incrementally reads, it will be judged based on 
regexp_like.
+        //When the regular length exceeds 512, an error will be reported, like 
ORA-12733: regular expression too long
+        if(tableName.length() > 384){
+            //max database name length 128
+            tableName = StringUtils.isNullOrWhitespaceOnly(includingTables) ? 
".*" : includingTables;
+        }
+
         String url = config.get(OracleSourceOptions.URL);
         String hostname = config.get(OracleSourceOptions.HOSTNAME);
         Integer port = config.get(OracleSourceOptions.PORT);
@@ -127,7 +135,6 @@ public class OracleDatabaseSync extends DatabaseSync {
         }
 
         //debezium properties set
-        Properties debeziumProperties = new Properties();
         debeziumProperties.put("decimal.handling.mode", "string");
         //date to string
         debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to