chenyz1984 commented on issue #8463:
URL: https://github.com/apache/seatunnel/issues/8463#issuecomment-2579079540

   # 1.  CDC 同步配置文件
   
   ## 1.1. CDC 单表(Oracle -> MySQL)
   
   ```yml
   env {
     # stream_ora2mysql.conf
     parallelism = 2
     job.mode = "STREAMING"
     checkpoint.interval = 5000
   }
   
   source {
     # CDC 同步时,建议为每个表独立定义一个 Oracle-CDC 连接,以保证稳定性
     Oracle-CDC {
       driver = "oracle.jdbc.driver.OracleDriver"
       base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 的连接串
       username = "c##logminer"            # 公共用户 Common User
       password = "logminer"
       database-names = ["PDB1"]           # 监控的 PDB(大写)
       schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表(大写)
       table-names = [                     # 监控的 table 
列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
           "PDB1.PROVINCE_LNJY.MY_OBJECTS"
       ]
       table-names-config = [              # 可以为特定表,显式指定主键列
         {
           table = "PDB1.PROVINCE_LNJY.MY_OBJECTS"
           primaryKeys = ["OBJECT_ID"]
         }
       ]
       result_table_name = "ORA_MY_OBJECTS"
       use_select_count = false             # 全量阶段,是否用 select count(*) 获取数据量;当 
count(*) 比 analyze table 速度快时,可设置为 true
       skip_analyze = false                 # 全量阶段,是否跳过 analyze 
table。如果表数据变更不频繁,可以设为true。
       source.reader.close.timeout = 120000
       connection.pool.size = 1
       schema-changes.enabled = true        # 启用 schema evolution 功能
       exactly_once = true                  # 默认为 false。是否启用数据的精确一次性处理。启用 
extractly one 语义,可确保数据不会重复
       debezium {
   
                   database.name = "CDB$ROOT" # 需指定 CDB 名称
                   database.pdb.name = "PDB1" # 需指定 PDB 名称
           }
       }
   }
   
   transform {
       Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
           source_table_name = "ORA_MY_OBJECTS"   
           result_table_name = "MYSQL_MY_OBJECTS"
           query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM 
ORA_MY_OBJECTS;"
       }
   }
   
   sink {
       jdbc {
           url = 
"jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "root"
           password = "sysz2024"
           database = "lnjy_frontdb"
           table = "${table_name}"      # 目标端表名与源端一致
           generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
           primary_keys = ["OBJECT_ID"] # generate_sink_sql 为 true 时,需指定主键
           
           source_table_name = "MYSQL_MY_OBJECTS"
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
           data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
           field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 
转大写,LOWERCASE 转小写
   
           is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 
xa_data_source_class_name
           xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
       }
   }
   ```
   
   ## 1.2. CDC 多表(Oracle -> MySQL)
   
   ```yml
   env {
     # stream_ora2mysql_multi.conf
     parallelism = 4
     job.mode = "STREAMING"
     checkpoint.interval = 5000
   }
   
   source {
       # CDC 数据同步,建议为每个表独立设置一个 Oracle-CDC,以确保稳定性
       Oracle-CDC {
           driver = "oracle.jdbc.driver.OracleDriver" # 固定为 
oracle.jdbc.driver.OracleDriver
           base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 连接串
           username = "c##logminer"                # 源库需要创建,公共用户 Common User
           password = "logminer"
           database-names = ["PDB1"]               # 监控的 PDB(大写)
           schema-names = ["PROVINCE_LNJY"]        # 监控的 schema 列表(大写)
           table-names = [                         # 监控的 table 
列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
               "PDB1.PROVINCE_LNJY.T_MY_TABLES"
           ]
           table-names-config = [              
               {
                   table = "PDB1.PROVINCE_LNJY.T_MY_TABLES"
                   primaryKeys = ["ID"]     # 为表显式指定主键字段
               }
           ]
           use_select_count = false                # 全量阶段,是否用 select count(*) 
获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
           skip_analyze = false                    # 全量阶段,是否跳过 analyze 
table。如果表数据变更不频繁,可以设为true。
           result_table_name = "ORA_MY_TABLES"    # 将本阶段处理的数据,注册为数据集(或称临时表)
           source.reader.close.timeout = 120000    
           connection.pool.size = 1
           schema-changes.enabled = true           # 默认为 true。是否启用 schema 
evolution 功能,即自动推断 DDL 脚本 
           exactly_once = true                    # 默认为 false。是否启用数据的精确一次性处理。启用 
extractly one 语义,可确保数据不会重复
           debezium {         
                    database.name = "CDB$ROOT" # 需指定 CDB 名称
                    database.pdb.name = "PDB1" # 需指定 PDB 名称
            }
       }
       
       Oracle-CDC {
           driver = "oracle.jdbc.driver.OracleDriver" # 固定为 
oracle.jdbc.driver.OracleDriver
           base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 连接串
           username = "c##logminer"                # 源库需要创建,公共用户 Common User
           password = "logminer"
           database-names = ["PDB1"]               # 监控的 PDB(大写)
           schema-names = ["PROVINCE_LNJY"]        # 监控的 schema 列表(大写)
           table-names = [                         # 监控的 table 
列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
               "PDB1.PROVINCE_LNJY.T_ALL_TABLES"
           ]
           table-names-config = [              
               {
                   table = "PDB1.PROVINCE_LNJY.T_ALL_TABLES"
                   primaryKeys = ["ID"]     # 为表显式指定主键字段
               }
           ]
           use_select_count = false                # 全量阶段,是否用 select count(*) 
获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
           skip_analyze = false                    # 全量阶段,是否跳过 analyze 
table。如果表数据变更不频繁,可以设为true。
           result_table_name = "ORA_ALL_TABLES"    # 将本阶段处理的数据,注册为数据集(或称临时表)
           source.reader.close.timeout = 120000    
           connection.pool.size = 1
           schema-changes.enabled = true           # 默认为 true。是否启用 schema 
evolution 功能,即自动推断 DDL 脚本 
           exactly_once = true                    # 默认为 false。是否启用数据的精确一次性处理。启用 
extractly one 语义,可确保数据不会重复
           debezium {         
                    database.name = "CDB$ROOT" # 需指定 CDB 名称
                    database.pdb.name = "PDB1" # 需指定 PDB 名称
            }
       }
   }
   
   transform {
       Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
           source_table_name = "ORA_MY_TABLES"   
           result_table_name = "TRANS_MY_TABLES"
           query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM 
ORA_MY_TABLES;"
       }
       Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
           source_table_name = "ORA_ALL_TABLES"   
           result_table_name = "TRANS_ALL_TABLES"
           query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM 
ORA_ALL_TABLES;"
       }
   }
   
   sink {
       jdbc {
           url = 
"jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "root"
           password = "sysz2024"
           database = "lnjy_frontdb"
           table = "${table_name}"      # 目标端表名与源端一致
           generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
           primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
           
           source_table_name = "TRANS_MY_TABLES"
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
           data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
           field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 
转大写,LOWERCASE 转小写
   
           is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 
xa_data_source_class_name
           xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
       }
       
       jdbc {
           url = 
"jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "root"
           password = "sysz2024"
           database = "lnjy_frontdb"
           table = "${table_name}"      # 目标端表名与源端一致
           generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
           primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
           
           source_table_name = "TRANS_ALL_TABLES"
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
           data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
           field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 
转大写,LOWERCASE 转小写
   
           is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 
xa_data_source_class_name
           xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
       }
   }
   ```
   
   # 2. 踩到的坑
   
   ## 2.1. ORA-00942: table or view does not exist
   
   1. **问题现象**
   
   批量同步阶段结束,在进入增量同步时,报错 `ORA-00942: table or view does not exist`,日志中还包含 
`UPDATE LOG_MINING_FLUSH` 表的 SQL 语句。经过排查,发现 `c##logminer` 在 PDB 中包含 
`LOG_MINING_FLUSH` 表;但在 CDB 中,该表不存在。
   
   2. **解决方案**
   
   在 CDB 的 `c##logminer` 用户下,手动建表 `LOG_MINING_FLUSH`:
   
   ```sql
   SQL> CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0));
   SQL> INSERT INTO LOG_MINING_FLUSH VALUES (0);
   SQL> commit;
   ```
   
   3. **疑问**
   
   猜测 `LOG_MINING_FLUSH` 表是用于记录 Redo 相关偏移量的。但为何在 PDB 中自动建该表,却未在 CDB 中自动建表?与 CDC 
作业配置的 `base-url` 是否有关系?
   
   ## 2.2. ORA-65040: operation not allowed from within a pluggable database
   
   1. **问题描述**
   
   `base-url` 设置为 pdb 名称时,在批量同步阶段可以正常读取数据。但是,进入增量同步阶段时,报错 `ORA-65040: operation 
not allowed from within a pluggable database`。
   
   2. **问题解决**
   
   为 `source` 增加如下配置:
   
   ```yml
   debezium {
   
                   database.name = "CDB$ROOT" # 需指定 CDB 名称
                   database.pdb.name = "PDB1" # 需指定 PDB 名称
           }
   ```
   
   增量同步可正常工作。
   
   3. **疑问**
   
   同步进程根据 `debezium` 的配置,执行 `CDB` 与 `PDB` 的切换操作。然后,执行 `SYS.DBMS_LOGMNR` 中的存储过程?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to