This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d7b20a4e [Fix] Fix Oracle cdb and pdb model unable to create tables (#423) d7b20a4e is described below commit d7b20a4e0102983e7c2cca6e1a2ed5fb1ba9b0bc Author: wudi <676366...@qq.com> AuthorDate: Thu Jul 11 17:26:09 2024 +0800 [Fix] Fix Oracle cdb and pdb model unable to create tables (#423) --- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 360351e4..beb6a677 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -44,6 +44,7 @@ import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -63,6 +64,7 @@ public class OracleDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class); private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; + private static final String PDB_KEY = "debezium.database.pdb.name"; public OracleDatabaseSync() throws SQLException { super(); @@ -108,9 +110,11 @@ public class OracleDatabaseSync extends DatabaseSync { public List<SourceSchema> getSchemaList() throws Exception { String databaseName = config.get(OracleSourceOptions.DATABASE_NAME); String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); + List<SourceSchema> schemaList = new ArrayList<>(); LOG.info("database-name {}, schema-name {}", databaseName, schemaName); try (Connection conn = getConnection()) { + setSessionToPdb(conn); DatabaseMetaData metaData = conn.getMetaData(); try (ResultSet tables = metaData.getTables(databaseName, schemaName, "%", new String[] {"TABLE"})) { @@ -134,6 +138,23 @@ public class OracleDatabaseSync extends DatabaseSync { return schemaList; } + private void setSessionToPdb(Connection conn) throws SQLException { + String pdbName = null; + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + String key = entry.getKey(); + if (key.equals(PDB_KEY)) { + pdbName = entry.getValue(); + break; + } + } + if (!StringUtils.isNullOrWhitespaceOnly(pdbName)) { + LOG.info("Found pdb name in config, set session to pdb to {}", pdbName); + try (Statement statement = conn.createStatement()) { + statement.execute("alter session set container=" + pdbName); + } + } + } + @Override public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { Properties debeziumProperties = new Properties(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org