chenyz1984 commented on issue #8463: URL: https://github.com/apache/seatunnel/issues/8463#issuecomment-2579079540
# 1. CDC 同步配置文件 ## 1.1. CDC 单表(Oracle -> MySQL) ```yml env { # stream_ora2mysql.conf parallelism = 2 job.mode = "STREAMING" checkpoint.interval = 5000 } source { # CDC 同步时,建议为每个表独立定义一个 Oracle-CDC 连接,以保证稳定性 Oracle-CDC { driver = "oracle.jdbc.driver.OracleDriver" base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 的连接串 username = "c##logminer" # 公共用户 Common User password = "logminer" database-names = ["PDB1"] # 监控的 PDB(大写) schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 "PDB1.PROVINCE_LNJY.MY_OBJECTS" ] table-names-config = [ # 可以为特定表,显式指定主键列 { table = "PDB1.PROVINCE_LNJY.MY_OBJECTS" primaryKeys = ["OBJECT_ID"] } ] result_table_name = "ORA_MY_OBJECTS" use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 source.reader.close.timeout = 120000 connection.pool.size = 1 schema-changes.enabled = true # 启用 schema evolution 功能 exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 debezium { database.name = "CDB$ROOT" # 需指定 CDB 名称 database.pdb.name = "PDB1" # 需指定 PDB 名称 } } } transform { Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 source_table_name = "ORA_MY_OBJECTS" result_table_name = "MYSQL_MY_OBJECTS" query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_OBJECTS;" } } sink { jdbc { url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "sysz2024" database = "lnjy_frontdb" table = "${table_name}" # 目标端表名与源端一致 generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 primary_keys = ["OBJECT_ID"] # generate_sink_sql 为 true 时,需指定主键 source_table_name = "MYSQL_MY_OBJECTS" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } } ``` ## 1.2. CDC 多表(Oracle -> MySQL) ```yml env { # stream_ora2mysql_multi.conf parallelism = 4 job.mode = "STREAMING" checkpoint.interval = 5000 } source { # CDC 数据同步,建议为每个表独立设置一个 Oracle-CDC,以确保稳定性 Oracle-CDC { driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串 username = "c##logminer" # 源库需要创建,公共用户 Common User password = "logminer" database-names = ["PDB1"] # 监控的 PDB(大写) schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 "PDB1.PROVINCE_LNJY.T_MY_TABLES" ] table-names-config = [ { table = "PDB1.PROVINCE_LNJY.T_MY_TABLES" primaryKeys = ["ID"] # 为表显式指定主键字段 } ] use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 result_table_name = "ORA_MY_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表) source.reader.close.timeout = 120000 connection.pool.size = 1 schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 debezium { database.name = "CDB$ROOT" # 需指定 CDB 名称 database.pdb.name = "PDB1" # 需指定 PDB 名称 } } Oracle-CDC { driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串 username = "c##logminer" # 源库需要创建,公共用户 Common User password = "logminer" database-names = ["PDB1"] # 监控的 PDB(大写) schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 "PDB1.PROVINCE_LNJY.T_ALL_TABLES" ] table-names-config = [ { table = "PDB1.PROVINCE_LNJY.T_ALL_TABLES" primaryKeys = ["ID"] # 为表显式指定主键字段 } ] use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 result_table_name = "ORA_ALL_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表) source.reader.close.timeout = 120000 connection.pool.size = 1 schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 debezium { database.name = "CDB$ROOT" # 需指定 CDB 名称 database.pdb.name = "PDB1" # 需指定 PDB 名称 } } } transform { Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 source_table_name = "ORA_MY_TABLES" result_table_name = "TRANS_MY_TABLES" query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_TABLES;" } Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 source_table_name = "ORA_ALL_TABLES" result_table_name = "TRANS_ALL_TABLES" query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_ALL_TABLES;" } } sink { jdbc { url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "sysz2024" database = "lnjy_frontdb" table = "${table_name}" # 目标端表名与源端一致 generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键 source_table_name = "TRANS_MY_TABLES" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } jdbc { url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "sysz2024" database = "lnjy_frontdb" table = "${table_name}" # 目标端表名与源端一致 generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键 source_table_name = "TRANS_ALL_TABLES" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } } ``` # 2. 踩到的坑 ## 2.1. ORA-00942: table or view does not exist 1. **问题现象** 批量同步阶段结束,在进入增量同步时,报错 `ORA-00942: table or view does not exist`,日志中还包含 `UPDATE LOG_MINING_FLUSH` 表的 SQL 语句。经过排查,发现 `c##logminer` 在 PDB 中包含 `LOG_MINING_FLUSH` 表;但在 CDB 中,该表不存在。 2. **解决方案** 在 CDB 的 `c##logminer` 用户下,手动建表 `LOG_MINING_FLUSH`: ```sql SQL> CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)); SQL> INSERT INTO LOG_MINING_FLUSH VALUES (0); SQL> commit; ``` 3. **疑问** 猜测 `LOG_MINING_FLUSH` 表是用于记录 Redo 相关偏移量的。但为何在 PDB 中自动建该表,却未在 CDB 中自动建表?与 CDC 作业配置的 `base-url` 是否有关系? ## 2.2. ORA-65040: operation not allowed from within a pluggable database 1. **问题描述** `base-url` 设置为 pdb 名称时,在批量同步阶段可以正常读取数据。但是,进入增量同步阶段时,报错 `ORA-65040: operation not allowed from within a pluggable database`。 2. **问题解决** 为 `source` 增加如下配置: ```yml debezium { database.name = "CDB$ROOT" # 需指定 CDB 名称 database.pdb.name = "PDB1" # 需指定 PDB 名称 } ``` 增量同步可正常工作。 3. **疑问** 同步进程根据 `debezium` 的配置,执行 `CDB` 与 `PDB` 的切换操作。然后,执行 `SYS.DBMS_LOGMNR` 中的存储过程? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org