xiangmingtao opened a new issue, #60942: URL: https://github.com/apache/doris/issues/60942
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Version Doris4.0.2 ### What's Wrong? When executing the Flink-CDC incremental synchronization script, the following error is prompted: [ERROR] Could not execute SQL statement. Reason: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schema_version" (class org.apache.doris.flink.rest.models.Schema), not marked as ignorable (3 known properties: "status", "properties", "keysType"]) at [Source: (String)"{"schema_version":0,"keysType":"UNIQUE_KEYS","materialized_indexes":{"zb_yyxx_cdc":{"schema_version":0,"storage_type":"COLUMN","schema_hash":1499721277,"keys_type":"UNIQUE_KEYS","columns":[{"aggregation_type":"","column_uid":"0","is_nullable":"No","is_key":"Yes","name":"gsgsuuid","comment":"公司工商uuid","type":"CHAR"},{"aggregation_type":"","column_uid":"1","is_nullable":"Yes","is_key":"Yes","name":"zbsclxdm","comment":"资本市场类型代码","type":"CHAR"},{"aggregation_type":"","column_uid":"2","is_nullable":"[truncated 2123 chars]; line: 1, column: 20] (through reference chain: org.apache.doris.flink.rest.models.Schema["schema_version"]) ``` ### What You Expected? success ### How to Reproduce? ## 步骤一:创建安装目录 # 创建 Flink 所需的挂载目录结构 # lib 目录用于存放驱动和连接器 JAR 包 # conf 目录用于存放 Flink 配置文件 mkdir -p /mnt/application/flink/{lib,conf} ## 步骤二:准备 docker-compose.yml 文件 将以下内容保存为 `docker-compose.yml`,上传至 `/mnt/application/flink` 目录。 ```yaml services: flink-jobmanager: image: flink:1.18.1-scala_2.12 container_name: flink-jobmanager-1.18.1 ports: - "8081:8081" command: jobmanager environment: - TZ=Asia/Shanghai - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager jobmanager.memory.process.size: 1024m rest.bind-port: 8081 taskmanager.numberOfTaskSlots: 2 env.timezone: Asia/Shanghai volumes: - /mnt/application/flink/lib/flink-sql-connector-mysql-cdc-3.2.1.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.1.jar - /mnt/application/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar - /mnt/application/flink/lib/mysql-connector-java-8.0.13.jar:/opt/flink/lib/mysql-connector-java-8.0.13.jar - /mnt/application/flink/lib/flink-doris-connector-1.18-25.1.0.jar:/opt/flink/lib/flink-doris-connector-1.18-25.1.0.jar - /mnt/application/flink/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml restart: always networks: - flink-network flink-taskmanager: image: flink:1.18.1-scala_2.12 container_name: flink-taskmanager-1.18.1 command: taskmanager environment: - TZ=Asia/Shanghai - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager taskmanager.memory.process.size: 1024m taskmanager.numberOfTaskSlots: 2 env.timezone: Asia/Shanghai volumes: - /mnt/application/flink/lib/flink-sql-connector-mysql-cdc-3.2.1.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.1.jar - /mnt/application/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar - /mnt/application/flink/lib/mysql-connector-java-8.0.13.jar:/opt/flink/lib/mysql-connector-java-8.0.13.jar - /mnt/application/flink/lib/flink-doris-connector-1.18-25.1.0.jar:/opt/flink/lib/flink-doris-connector-1.18-25.1.0.jar - /mnt/application/flink/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml depends_on: - flink-jobmanager restart: always networks: - flink-network networks: flink-network: driver: bridge name: flink-business-network-1.18.1 ## 步骤三:准备驱动和连接器 JAR 包 将以下 JAR 包上传至 `/mnt/application/flink/lib` 目录: `flink-doris-connector-1.18-25.1.0.jar` Flink 写入 Doris 的连接器 `flink-sql-connector-mysql-cdc-3.2.1.jar` MySQL CDC 数据捕获连接器 `flink-connector-jdbc-3.2.0-1.19.jar` Flink JDBC 通用连接器 `mysql-connector-java-8.0.13.jar` MySQL JDBC 驱动 ## 步骤四:启动 Flink 集群 # 进入 Flink 安装目录 cd /mnt/application/flink # 停止并移除已有容器 docker-compose down # 后台启动 Flink JobManager 和 TaskManager 容器 docker-compose up -d ## 步骤五:进入容器并启动 SQL Client # 进入 Flink JobManager 容器 docker exec -it flink-jobmanager-1.18.1 bash # 启动 Flink SQL Client,进入交互式 SQL 命令行 ./bin/sql-client.sh ## 步骤六:执行增量数据同步 SQL 脚本 在 Flink SQL Client 中依次执行以下语句: -- 在目标端创建测试使用的结果表 DROP TABLE IF EXISTS jck.zb_yyxx_cdc; CREATE TABLE jck.`zb_yyxx_cdc` ( `gsgsuuid` char(32) NOT NULL COMMENT "公司工商uuid", `zbsclxdm` char(2) DEFAULT NULL COMMENT "资本市场类型代码", `zbyyjzdm` char(4) NOT NULL COMMENT "资本运营进展代码", `zbyyrq` date NOT NULL COMMENT "资本运营日期", `zbyyztdm` char(1) NOT NULL COMMENT "资本状态代码", `sfdqzt` char(1) NOT NULL COMMENT "是否当前状态", `cjrq` datetime NOT NULL COMMENT "创建日期", `gxrq` datetime NOT NULL COMMENT "更新日期", INDEX idx_gsgsuuid(`gsgsuuid`) USING INVERTED, INDEX idx_zbyyrq(`zbyyrq`) USING INVERTED, INDEX idx_sfdqzt(`sfdqzt`) USING INVERTED, INDEX idx_zbyyztdm(`zbyyztdm`) USING INVERTED, INDEX idx_zbyyjzdm(`zbyyjzdm`) USING INVERTED, INDEX idx_zbsclxdm(`zbsclxdm`) USING INVERTED ) ENGINE=OLAP UNIQUE KEY(`gsgsuuid`, `zbsclxdm`, `zbyyjzdm`, `zbyyrq`) COMMENT "资本运营信息" DISTRIBUTED BY HASH(`gsgsuuid`) BUCKETS 4 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "enable_unique_key_merge_on_write" = "true", "compression" = "LZ4", "storage_medium" = "ssd" ); -- 开启 Checkpoint,每 10 秒触发一次,保障数据一致性与容错 SET 'execution.checkpointing.interval' = '10s'; -- ============================================= -- 创建 MySQL CDC 源端表:zb_yyxx_source -- 对应 MySQL 数据库 zxk 中的 zb_yyxx 表 -- ============================================= DROP TABLE IF EXISTS zb_yyxx_source; CREATE TABLE zb_yyxx_source ( gsuuid STRING COMMENT '公司uuid', zbsclxdm CHAR(2) COMMENT '资本市场类型代码', zbyyjzdm CHAR(4) NOT NULL COMMENT '资本运营进展代码', zbyyrq DATE NOT NULL COMMENT '资本运营日期', zbyyztdm CHAR(1) NOT NULL COMMENT '资本状态代码', sfdqzt CHAR(1) NOT NULL COMMENT '是否当前状态', cjrq TIMESTAMP NOT NULL COMMENT '创建日期', gxrq TIMESTAMP NOT NULL COMMENT '更新日期', PRIMARY KEY (gsuuid, zbsclxdm, zbyyjzdm, zbyyrq) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'XXX', 'port' = '3306', 'username' = 'XXX', 'password' = 'XXX', 'database-name' = 'zxk', 'table-name' = 'zb_yyxx' ); -- ============================================= -- 创建 MySQL CDC 源端关联表:gs_jbxx_source -- 对应 MySQL 数据库 jck 中的 gs_jbxx 表 -- 用于通过 gsuuid 关联获取公司工商 uuid -- ============================================= drop table if EXISTS gs_jbxx_source; CREATE TABLE gs_jbxx_source ( uuid CHAR(32) NOT NULL COMMENT '公司工商uuid', gsuuid STRING COMMENT '公司uuid', PRIMARY KEY (uuid) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'XXX', 'port' = '3306', 'username' = 'XXX', 'password' = 'XXX', 'database-name' = 'jck', 'table-name' = 'gs_jbxx' ); -- ============================================= -- 创建 Doris 目标映射表:zb_yyxx_sink -- 对应 Doris 数据库 jck 中的 zb_yyxx_cdc 表 -- 使用 Doris Connector 通过 Stream Load 方式写入 -- ============================================= DROP TABLE IF EXISTS zb_yyxx_sink; CREATE TABLE zb_yyxx_sink ( gsgsuuid CHAR(32) NOT NULL COMMENT '公司工商uuid', zbsclxdm CHAR(2) NULL COMMENT '资本市场类型代码', zbyyjzdm CHAR(4) NOT NULL COMMENT '资本运营进展代码', zbyyrq DATE NOT NULL COMMENT '资本运营日期', zbyyztdm CHAR(1) NOT NULL COMMENT '资本状态代码', sfdqzt CHAR(1) NOT NULL COMMENT '是否当前状态', cjrq TIMESTAMP NOT NULL COMMENT '创建日期', gxrq TIMESTAMP NOT NULL COMMENT '更新日期', PRIMARY KEY (gsgsuuid, zbsclxdm, zbyyjzdm, zbyyrq) NOT ENFORCED ) WITH ( 'connector' = 'doris', 'fenodes' = '10.10.100.15:8030', -- Doris FE 节点地址及 HTTP 端口 'table.identifier' = 'jck.zb_yyxx_cdc', -- 目标表标识:数据库名.表名 'username' = 'XXX', 'password' = 'XXX', 'sink.properties.format' = 'json', -- 使用 JSON 格式写入 'sink.properties.read_json_by_line' = 'true', -- 按行解析 JSON 'sink.enable-delete' = 'true', -- 同步 DELETE 事件(CDC 删除操作) 'sink.label-prefix' = 'doris_zb_yyxx_label' -- Stream Load 标签前缀,需全局唯一 ); -- ============================================= -- 执行增量同步脚本 -- 将 zb_yyxx_source 与 gs_jbxx_source 进行关联 -- 通过 gsuuid 匹配,将公司工商 uuid(gsgsuuid)写入 Doris -- ============================================= INSERT INTO zb_yyxx_sink SELECT gs.uuid AS gsgsuuid, -- 取关联表中的公司工商 uuid 作为目标主键 src.zbsclxdm, src.zbyyjzdm, src.zbyyrq, src.zbyyztdm, src.sfdqzt, src.cjrq, src.gxrq FROM zb_yyxx_source src JOIN ( SELECT uuid, gsuuid FROM gs_jbxx_source WHERE gsuuid IS NOT NULL ) gs ON src.gsuuid = gs.gsuuid WHERE src.gsuuid IS NOT NULL; 执行Flink-CDC 增量同步脚本,提示如下错误: [ERROR] Could not execute SQL statement. Reason: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schema_version" (class org.apache.doris.flink.rest.models.Schema), not marked as ignorable (3 known properties: "status", "properties", "keysType"]) at [Source: (String)"{"schema_version":0,"keysType":"UNIQUE_KEYS","materialized_indexes":{"zb_yyxx_cdc":{"schema_version":0,"storage_type":"COLUMN","schema_hash":1499721277,"keys_type":"UNIQUE_KEYS","columns":[{"aggregation_type":"","column_uid":"0","is_nullable":"No","is_key":"Yes","name":"gsgsuuid","comment":"公司工商uuid","type":"CHAR"},{"aggregation_type":"","column_uid":"1","is_nullable":"Yes","is_key":"Yes","name":"zbsclxdm","comment":"资本市场类型代码","type":"CHAR"},{"aggregation_type":"","column_uid":"2","is_nullable":"[truncated 2123 chars]; line: 1, column: 20] (through reference chain: org.apache.doris.flink.rest.models.Schema["schema_version"]) ``` ### Anything Else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
