This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d7b20a4e [Fix] Fix Oracle cdb and pdb model unable to create tables 
(#423)
d7b20a4e is described below

commit d7b20a4e0102983e7c2cca6e1a2ed5fb1ba9b0bc
Author: wudi <676366...@qq.com>
AuthorDate: Thu Jul 11 17:26:09 2024 +0800

    [Fix] Fix Oracle cdb and pdb model unable to create tables (#423)
---
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java  | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 360351e4..beb6a677 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -44,6 +44,7 @@ import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -63,6 +64,7 @@ public class OracleDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(OracleDatabaseSync.class);
 
     private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+    private static final String PDB_KEY = "debezium.database.pdb.name";
 
     public OracleDatabaseSync() throws SQLException {
         super();
@@ -108,9 +110,11 @@ public class OracleDatabaseSync extends DatabaseSync {
     public List<SourceSchema> getSchemaList() throws Exception {
         String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
         String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+
         List<SourceSchema> schemaList = new ArrayList<>();
         LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
         try (Connection conn = getConnection()) {
+            setSessionToPdb(conn);
             DatabaseMetaData metaData = conn.getMetaData();
             try (ResultSet tables =
                     metaData.getTables(databaseName, schemaName, "%", new 
String[] {"TABLE"})) {
@@ -134,6 +138,23 @@ public class OracleDatabaseSync extends DatabaseSync {
         return schemaList;
     }
 
+    private void setSessionToPdb(Connection conn) throws SQLException {
+        String pdbName = null;
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            if (key.equals(PDB_KEY)) {
+                pdbName = entry.getValue();
+                break;
+            }
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(pdbName)) {
+            LOG.info("Found pdb name in config, set session to pdb to {}", 
pdbName);
+            try (Statement statement = conn.createStatement()) {
+                statement.execute("alter session set container=" + pdbName);
+            }
+        }
+    }
+
     @Override
     public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment 
env) {
         Properties debeziumProperties = new Properties();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to