This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new da648ac [fix](oracle) add oracle regexp like compatible (#194) da648ac is described below commit da648ac4c1a53bb5769b1fc193e054f9f880a62a Author: wudi <676366...@qq.com> AuthorDate: Thu Sep 14 15:53:59 2023 +0800 [fix](oracle) add oracle regexp like compatible (#194) When include-tables has too many table names, and debezium incrementally reads, it will be judged based on `regexp_like`. When the regular length exceeds 512, an error will be reported, like `ORA-12733: regular expression too long` --- .../main/java/org/apache/doris/flink/lookup/ExecutionPool.java | 4 ++-- .../main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 4 ++++ .../apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java | 9 ++++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java index 9b930ff..94cb35c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java @@ -104,9 +104,9 @@ public class ExecutionPool implements Closeable { public void close() throws IOException { if (started.compareAndSet(true, false)) { LOG.info("close executorService"); - actionWatcherExecutorService.shutdownNow(); + actionWatcherExecutorService.shutdown(); + workerExecutorService.shutdown(); workerStated.set(false); - workerExecutorService.shutdownNow(); this.actionWatcherExecutorService = null; this.workerExecutorService = null; this.semaphore = null; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 455000e..e36590c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -61,6 +61,8 @@ public abstract class DatabaseSync { public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; + protected String includingTables; + protected String excludingTables; public abstract Connection getConnection() throws SQLException; @@ -76,6 +78,8 @@ public abstract class DatabaseSync { this.config = config; this.database = database; this.converter = new TableNameConverter(tablePrefix, tableSuffix); + this.includingTables = includingTables; + this.excludingTables = excludingTables; this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); this.ignoreDefaultValue = ignoreDefaultValue; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 3b4bc31..bfd974f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -107,11 +107,19 @@ public class OracleDatabaseSync extends DatabaseSync { @Override public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { + Properties debeziumProperties = new Properties(); String databaseName = config.get(OracleSourceOptions.DATABASE_NAME); String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); Preconditions.checkNotNull(databaseName, "database-name in oracle is required"); Preconditions.checkNotNull(schemaName, "schema-name in oracle is required"); String tableName = config.get(OracleSourceOptions.TABLE_NAME); + //When debezium incrementally reads, it will be judged based on regexp_like. + //When the regular length exceeds 512, an error will be reported, like ORA-12733: regular expression too long + if(tableName.length() > 384){ + //max database name length 128 + tableName = StringUtils.isNullOrWhitespaceOnly(includingTables) ? ".*" : includingTables; + } + String url = config.get(OracleSourceOptions.URL); String hostname = config.get(OracleSourceOptions.HOSTNAME); Integer port = config.get(OracleSourceOptions.PORT); @@ -127,7 +135,6 @@ public class OracleDatabaseSync extends DatabaseSync { } //debezium properties set - Properties debeziumProperties = new Properties(); debeziumProperties.put("decimal.handling.mode", "string"); //date to string debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org