This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new b1854bb5d87 [doc] update flink connector doc (#2171) b1854bb5d87 is described below commit b1854bb5d874266d57349cc8e6d930aefc05c0d8 Author: wudi <w...@selectdb.com> AuthorDate: Tue Mar 11 17:30:53 2025 +0800 [doc] update flink connector doc (#2171) ## Versions - [x] dev - [x] 3.0 - [x] 2.1 - [ ] 2.0 ## Languages - [x] Chinese - [x] English ## Docs Checklist - [ ] Checked by AI - [ ] Test Cases Built --- docs/ecosystem/flink-doris-connector.md | 68 ++++++++++++++++++--- .../current/ecosystem/flink-doris-connector.md | 70 +++++++++++++++++++--- .../version-2.1/ecosystem/flink-doris-connector.md | 70 +++++++++++++++++++--- .../version-3.0/ecosystem/flink-doris-connector.md | 70 +++++++++++++++++++--- .../version-2.1/ecosystem/flink-doris-connector.md | 68 ++++++++++++++++++--- .../version-3.0/ecosystem/flink-doris-connector.md | 68 ++++++++++++++++++--- 6 files changed, 369 insertions(+), 45 deletions(-) diff --git a/docs/ecosystem/flink-doris-connector.md b/docs/ecosystem/flink-doris-connector.md index c2835947b82..db024c29a74 100644 --- a/docs/ecosystem/flink-doris-connector.md +++ b/docs/ecosystem/flink-doris-connector.md @@ -738,6 +738,57 @@ After starting the Flink cluster, you can directly run the following command: --table-conf replication_num=1 ``` +#### AWS Aurora MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + + ## Usage Instructions ### Parameter Configuration @@ -747,7 +798,7 @@ After starting the Flink cluster, you can directly run the following command: | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http addresses. Multiple addresses are supported and should be separated by commas. | -| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. Refer to [#187](https://github.com/apache/doris-flink-connector/pull/187). | +| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. | | jdbc-url | -- | N | JDBC connection information, such as jdbc:mysql://127.0.0.1:9030. | | table.identifier | -- | Y | Doris table name, such as db.tbl. | | username | -- | Y | Username for accessing Doris. | @@ -763,10 +814,8 @@ After starting the Flink cluster, you can directly run the following command: | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | The timeout for querying Doris. The default value is 6 hours. | | doris.request.tablet.size | 1 | N | The number of Doris Tablets corresponding to one Partition. The smaller this value is set, the more Partitions will be generated, which can increase the parallelism on the Flink side. However, it will also put more pressure on Doris. | -| doris.batch.size | 1024 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | +| doris.batch.size | 4064 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | | doris.exec.mem.limit | 8192mb | N | The memory limit for a single query. The default is 8GB, in bytes. | -| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to the RowBatch required by the flink-doris-connector iteration. | -| doris.deserialize.queue.size | 64 | N | The internal processing queue for asynchronous conversion of the Arrow format. It takes effect when doris.deserialize.arrow.async is set to true. | | source.use-flight-sql | FALSE | N | Whether to use Arrow Flight SQL for reading. | | source.flight-sql-port | - | N | The arrow_flight_sql_port of FE when using Arrow Flight SQL for reading. | @@ -848,9 +897,7 @@ After starting the Flink cluster, you can directly run the following command: | --sink-conf | All the configurations of the Doris Sink can be viewed [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#General Configuration Items). | | --mongodb-conf | The configuration of the MongoDB CDCSource, for example, --mongodb-conf hosts=127.0.0.1:27017. You can view all the configurations of Mongo-CDC [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/). Among them, hosts, username, password, and database are required. --mongodb-conf schema.sample-percent is the configuration for automatically sampling MongoDB data to create tables in Doris, and the default [...] | --table-conf | The configuration items of the Doris table, that is, the content included in properties (except for table-buckets, which is not a properties attribute). For example, --table-conf replication_num=1, and --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50" means specifying the number of buckets for different tables in the order of regular expressions. If there is no match, the BUCKETS AUTO method will be used to create tables. | -| --ignore-default-value | Disable the default values of the MySQL table structure when synchronizing. It is applicable to the situation where there are default values for fields when synchronizing MySQL data to Doris, but the actual inserted data is null. Refer to [#152](https://github.com/apache/doris-flink-connector/pull/152). | -| --use-new-schema-change | Whether to use the new schema change, which supports multi-column changes and default values in MySQL synchronization. Since version 1.6.0, this parameter is set to true by default. Refer to [#167](https://github.com/apache/doris-flink-connector/pull/167). | -| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] +| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] | --single-sink | Whether to use a single Sink to synchronize all tables. After enabling, it can also automatically identify newly created tables upstream and create tables automatically. | | --multi-to-one-origin | The configuration of the source tables when multiple upstream tables are written to the same table, for example: --multi-to-one-origin "a\_.\*\|b_.\*", refer to [#208](https://github.com/apache/doris-flink-connector/pull/208) | | --multi-to-one-target | Used in combination with multi-to-one-origin, the configuration of the target table, for example: --multi-to-one-target "a\|b" | @@ -1040,6 +1087,13 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC Synchronize DDL Statements +Generally, when synchronizing upstream data sources such as MySQL, when adding or deleting fields in the upstream, you need to synchronize the Schema Change operation in Doris. + +For this scenario, you usually need to write a program for the DataStream API and use the JsonDebeziumSchemaSerializer serializer provided by DorisSink to automatically perform SchemaChange. For details, please refer to [CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +In the whole database synchronization tool provided by the Connector, no additional configuration is required, and the upstream DDL will be automatically synchronized and the SchemaChange operation will be performed in Doris. + ## Frequently Asked Questions (FAQ) 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md index b653a639e18..c433ff52457 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md @@ -743,6 +743,56 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink --table-conf replication_num=1 ``` +#### AWS Aurora MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + ## 使用说明 ### 参数配置 @@ -752,7 +802,7 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔,参考[#187](https://github.com/apache/doris-flink-connector/pull/187) | +| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔 | | jdbc-url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:9030 | | table.identifier | -- | Y | Doris 表名,如:db.tbl | | username | -- | Y | 访问 Doris 的用户名 | @@ -768,10 +818,8 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | 查询 Doris 的超时时间,默认值为 6 小时 | | doris.request.tablet.size | 1 | N | 一个 Partition 对应的 Doris Tablet 个数。此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | +| doris.batch.size | 4064 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | | doris.exec.mem.limit | 8192mb | N | 单个查询的内存限制。默认为 8GB,单位为字节 | -| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | source.use-flight-sql | FALSE | N | 是否使用 Arrow Flight SQL 读取 | | source.flight-sql-port | - | N | 使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port | @@ -853,12 +901,10 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | --sink-conf | Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#通用配置项)查看完整的配置项。 | | --mongodb-conf | MongoDB CDCSource 配置,例如 --mongodb-conf hosts=127.0.0.1:27017,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/)查看所有配置 Mongo-CDC,其中 hosts/username/password/database 是必须的。其中 --mongodb-conf schema.sample-percent 为自动采样 mongodb 数据为 Doris 建表的配置,默认为 0.2 | | --table-conf | Doris 表的配置项,即 properties 中包含的内容(其中 table-buckets 例外,非 properties 属性)。例如 --table-conf replication_num=1,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"表示按照正则表达式顺序指定不同表的 buckets 数量,如果没有匹配到则采用 BUCKETS AUTO 建表。 | -| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 Doris 时,字段有默认值,但实际插入数据为 null 情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) | -| --use-new-schema-change | 是否使用新的 schema change,支持同步 MySQL 多列变更、默认值,1.6.0 开始该参数默认为 true。参考[#167](https://github.com/apache/doris-flink-connector/pull/167) | -| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure 本功能将在 24.0.0 后的版本中提供 | +| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure。24.0.0 后支持 | | --single-sink | 是否使用单个 Sink 同步所有表,开启后也可自动识别上游新创建的表,自动创建表。 | | --multi-to-one-origin | 将上游多张表写入同一张表时,源表的配置,比如:--multi-to-one-origin "a\_.\*\|b_.\*",具体参考[#208](https://github.com/apache/doris-flink-connector/pull/208) | -| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\ | +| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\|b" | | --create-table-only | 是否只仅仅同步表的结构 | ### 类型映射 @@ -1043,6 +1089,14 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC 同步 DDL 语句 +一般同步 MySQL 等上游数据源的时候,上游增加或删除字段的时候需要同步在 Doris 中进行 Schema Change 操作。 + +对于这种场景,通常需要编写 DataStream API 的程序,同时使用 DorisSink 提供的 JsonDebeziumSchemaSerializer 序列化,便可以自动做到 SchemaChange,具体可参考[CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +在 Connector 提供的整库同步工具中,无需额外配置,自动会同步上游 DDL,并在 Doris 进行 SchemaChange 操作。 + + ## FAQ 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/flink-doris-connector.md index b653a639e18..c433ff52457 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/flink-doris-connector.md @@ -743,6 +743,56 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink --table-conf replication_num=1 ``` +#### AWS Aurora MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + ## 使用说明 ### 参数配置 @@ -752,7 +802,7 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔,参考[#187](https://github.com/apache/doris-flink-connector/pull/187) | +| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔 | | jdbc-url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:9030 | | table.identifier | -- | Y | Doris 表名,如:db.tbl | | username | -- | Y | 访问 Doris 的用户名 | @@ -768,10 +818,8 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | 查询 Doris 的超时时间,默认值为 6 小时 | | doris.request.tablet.size | 1 | N | 一个 Partition 对应的 Doris Tablet 个数。此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | +| doris.batch.size | 4064 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | | doris.exec.mem.limit | 8192mb | N | 单个查询的内存限制。默认为 8GB,单位为字节 | -| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | source.use-flight-sql | FALSE | N | 是否使用 Arrow Flight SQL 读取 | | source.flight-sql-port | - | N | 使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port | @@ -853,12 +901,10 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | --sink-conf | Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#通用配置项)查看完整的配置项。 | | --mongodb-conf | MongoDB CDCSource 配置,例如 --mongodb-conf hosts=127.0.0.1:27017,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/)查看所有配置 Mongo-CDC,其中 hosts/username/password/database 是必须的。其中 --mongodb-conf schema.sample-percent 为自动采样 mongodb 数据为 Doris 建表的配置,默认为 0.2 | | --table-conf | Doris 表的配置项,即 properties 中包含的内容(其中 table-buckets 例外,非 properties 属性)。例如 --table-conf replication_num=1,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"表示按照正则表达式顺序指定不同表的 buckets 数量,如果没有匹配到则采用 BUCKETS AUTO 建表。 | -| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 Doris 时,字段有默认值,但实际插入数据为 null 情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) | -| --use-new-schema-change | 是否使用新的 schema change,支持同步 MySQL 多列变更、默认值,1.6.0 开始该参数默认为 true。参考[#167](https://github.com/apache/doris-flink-connector/pull/167) | -| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure 本功能将在 24.0.0 后的版本中提供 | +| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure。24.0.0 后支持 | | --single-sink | 是否使用单个 Sink 同步所有表,开启后也可自动识别上游新创建的表,自动创建表。 | | --multi-to-one-origin | 将上游多张表写入同一张表时,源表的配置,比如:--multi-to-one-origin "a\_.\*\|b_.\*",具体参考[#208](https://github.com/apache/doris-flink-connector/pull/208) | -| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\ | +| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\|b" | | --create-table-only | 是否只仅仅同步表的结构 | ### 类型映射 @@ -1043,6 +1089,14 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC 同步 DDL 语句 +一般同步 MySQL 等上游数据源的时候,上游增加或删除字段的时候需要同步在 Doris 中进行 Schema Change 操作。 + +对于这种场景,通常需要编写 DataStream API 的程序,同时使用 DorisSink 提供的 JsonDebeziumSchemaSerializer 序列化,便可以自动做到 SchemaChange,具体可参考[CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +在 Connector 提供的整库同步工具中,无需额外配置,自动会同步上游 DDL,并在 Doris 进行 SchemaChange 操作。 + + ## FAQ 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/flink-doris-connector.md index b653a639e18..c433ff52457 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/flink-doris-connector.md @@ -743,6 +743,56 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink --table-conf replication_num=1 ``` +#### AWS Aurora MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL 整库同步 + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + ## 使用说明 ### 参数配置 @@ -752,7 +802,7 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔,参考[#187](https://github.com/apache/doris-flink-connector/pull/187) | +| benodes | -- | N | Doris BE http 地址,支持多个地址,使用逗号分隔 | | jdbc-url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:9030 | | table.identifier | -- | Y | Doris 表名,如:db.tbl | | username | -- | Y | 访问 Doris 的用户名 | @@ -768,10 +818,8 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | 查询 Doris 的超时时间,默认值为 6 小时 | | doris.request.tablet.size | 1 | N | 一个 Partition 对应的 Doris Tablet 个数。此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | +| doris.batch.size | 4064 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 | | doris.exec.mem.limit | 8192mb | N | 单个查询的内存限制。默认为 8GB,单位为字节 | -| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | source.use-flight-sql | FALSE | N | 是否使用 Arrow Flight SQL 读取 | | source.flight-sql-port | - | N | 使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port | @@ -853,12 +901,10 @@ Flink Doris Connector 中集成了[Flink CDC](https://nightlies.apache.org/flink | --sink-conf | Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#通用配置项)查看完整的配置项。 | | --mongodb-conf | MongoDB CDCSource 配置,例如 --mongodb-conf hosts=127.0.0.1:27017,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/)查看所有配置 Mongo-CDC,其中 hosts/username/password/database 是必须的。其中 --mongodb-conf schema.sample-percent 为自动采样 mongodb 数据为 Doris 建表的配置,默认为 0.2 | | --table-conf | Doris 表的配置项,即 properties 中包含的内容(其中 table-buckets 例外,非 properties 属性)。例如 --table-conf replication_num=1,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"表示按照正则表达式顺序指定不同表的 buckets 数量,如果没有匹配到则采用 BUCKETS AUTO 建表。 | -| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 Doris 时,字段有默认值,但实际插入数据为 null 情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) | -| --use-new-schema-change | 是否使用新的 schema change,支持同步 MySQL 多列变更、默认值,1.6.0 开始该参数默认为 true。参考[#167](https://github.com/apache/doris-flink-connector/pull/167) | -| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure 本功能将在 24.0.0 后的版本中提供 | +| --schema-change-mode | 解析 schema change 的模式,支持 debezium_structure、sql_parser 两种解析模式,默认采用 debezium_structure 模式。debezium_structure 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。sql_parser 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。使用例子:--schema-change-mode debezium_structure。24.0.0 后支持 | | --single-sink | 是否使用单个 Sink 同步所有表,开启后也可自动识别上游新创建的表,自动创建表。 | | --multi-to-one-origin | 将上游多张表写入同一张表时,源表的配置,比如:--multi-to-one-origin "a\_.\*\|b_.\*",具体参考[#208](https://github.com/apache/doris-flink-connector/pull/208) | -| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\ | +| --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target "a\|b" | | --create-table-only | 是否只仅仅同步表的结构 | ### 类型映射 @@ -1043,6 +1089,14 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC 同步 DDL 语句 +一般同步 MySQL 等上游数据源的时候,上游增加或删除字段的时候需要同步在 Doris 中进行 Schema Change 操作。 + +对于这种场景,通常需要编写 DataStream API 的程序,同时使用 DorisSink 提供的 JsonDebeziumSchemaSerializer 序列化,便可以自动做到 SchemaChange,具体可参考[CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +在 Connector 提供的整库同步工具中,无需额外配置,自动会同步上游 DDL,并在 Doris 进行 SchemaChange 操作。 + + ## FAQ 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** diff --git a/versioned_docs/version-2.1/ecosystem/flink-doris-connector.md b/versioned_docs/version-2.1/ecosystem/flink-doris-connector.md index c2835947b82..db024c29a74 100644 --- a/versioned_docs/version-2.1/ecosystem/flink-doris-connector.md +++ b/versioned_docs/version-2.1/ecosystem/flink-doris-connector.md @@ -738,6 +738,57 @@ After starting the Flink cluster, you can directly run the following command: --table-conf replication_num=1 ``` +#### AWS Aurora MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + + ## Usage Instructions ### Parameter Configuration @@ -747,7 +798,7 @@ After starting the Flink cluster, you can directly run the following command: | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http addresses. Multiple addresses are supported and should be separated by commas. | -| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. Refer to [#187](https://github.com/apache/doris-flink-connector/pull/187). | +| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. | | jdbc-url | -- | N | JDBC connection information, such as jdbc:mysql://127.0.0.1:9030. | | table.identifier | -- | Y | Doris table name, such as db.tbl. | | username | -- | Y | Username for accessing Doris. | @@ -763,10 +814,8 @@ After starting the Flink cluster, you can directly run the following command: | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | The timeout for querying Doris. The default value is 6 hours. | | doris.request.tablet.size | 1 | N | The number of Doris Tablets corresponding to one Partition. The smaller this value is set, the more Partitions will be generated, which can increase the parallelism on the Flink side. However, it will also put more pressure on Doris. | -| doris.batch.size | 1024 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | +| doris.batch.size | 4064 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | | doris.exec.mem.limit | 8192mb | N | The memory limit for a single query. The default is 8GB, in bytes. | -| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to the RowBatch required by the flink-doris-connector iteration. | -| doris.deserialize.queue.size | 64 | N | The internal processing queue for asynchronous conversion of the Arrow format. It takes effect when doris.deserialize.arrow.async is set to true. | | source.use-flight-sql | FALSE | N | Whether to use Arrow Flight SQL for reading. | | source.flight-sql-port | - | N | The arrow_flight_sql_port of FE when using Arrow Flight SQL for reading. | @@ -848,9 +897,7 @@ After starting the Flink cluster, you can directly run the following command: | --sink-conf | All the configurations of the Doris Sink can be viewed [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#General Configuration Items). | | --mongodb-conf | The configuration of the MongoDB CDCSource, for example, --mongodb-conf hosts=127.0.0.1:27017. You can view all the configurations of Mongo-CDC [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/). Among them, hosts, username, password, and database are required. --mongodb-conf schema.sample-percent is the configuration for automatically sampling MongoDB data to create tables in Doris, and the default [...] | --table-conf | The configuration items of the Doris table, that is, the content included in properties (except for table-buckets, which is not a properties attribute). For example, --table-conf replication_num=1, and --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50" means specifying the number of buckets for different tables in the order of regular expressions. If there is no match, the BUCKETS AUTO method will be used to create tables. | -| --ignore-default-value | Disable the default values of the MySQL table structure when synchronizing. It is applicable to the situation where there are default values for fields when synchronizing MySQL data to Doris, but the actual inserted data is null. Refer to [#152](https://github.com/apache/doris-flink-connector/pull/152). | -| --use-new-schema-change | Whether to use the new schema change, which supports multi-column changes and default values in MySQL synchronization. Since version 1.6.0, this parameter is set to true by default. Refer to [#167](https://github.com/apache/doris-flink-connector/pull/167). | -| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] +| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] | --single-sink | Whether to use a single Sink to synchronize all tables. After enabling, it can also automatically identify newly created tables upstream and create tables automatically. | | --multi-to-one-origin | The configuration of the source tables when multiple upstream tables are written to the same table, for example: --multi-to-one-origin "a\_.\*\|b_.\*", refer to [#208](https://github.com/apache/doris-flink-connector/pull/208) | | --multi-to-one-target | Used in combination with multi-to-one-origin, the configuration of the target table, for example: --multi-to-one-target "a\|b" | @@ -1040,6 +1087,13 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC Synchronize DDL Statements +Generally, when synchronizing upstream data sources such as MySQL, when adding or deleting fields in the upstream, you need to synchronize the Schema Change operation in Doris. + +For this scenario, you usually need to write a program for the DataStream API and use the JsonDebeziumSchemaSerializer serializer provided by DorisSink to automatically perform SchemaChange. For details, please refer to [CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +In the whole database synchronization tool provided by the Connector, no additional configuration is required, and the upstream DDL will be automatically synchronized and the SchemaChange operation will be performed in Doris. + ## Frequently Asked Questions (FAQ) 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** diff --git a/versioned_docs/version-3.0/ecosystem/flink-doris-connector.md b/versioned_docs/version-3.0/ecosystem/flink-doris-connector.md index c2835947b82..db024c29a74 100644 --- a/versioned_docs/version-3.0/ecosystem/flink-doris-connector.md +++ b/versioned_docs/version-3.0/ecosystem/flink-doris-connector.md @@ -738,6 +738,57 @@ After starting the Flink cluster, you can directly run the following command: --table-conf replication_num=1 ``` +#### AWS Aurora MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +#### AWS RDS MySQL Whole Database Synchronization + +```Shell +<FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.18-25.0.0.jar \ + mysql-sync-database \ + --database testwd \ + --mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \ + --mysql-conf port=3306 \ + --mysql-conf username=admin \ + --mysql-conf password=123456 \ + --mysql-conf database-name=test \ + --mysql-conf server-time-zone=UTC \ + --including-tables "student" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + + ## Usage Instructions ### Parameter Configuration @@ -747,7 +798,7 @@ After starting the Flink cluster, you can directly run the following command: | Key | Default Value | Required | Comment | | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | fenodes | -- | Y | Doris FE http addresses. Multiple addresses are supported and should be separated by commas. | -| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. Refer to [#187](https://github.com/apache/doris-flink-connector/pull/187). | +| benodes | -- | N | Doris BE http addresses. Multiple addresses are supported and should be separated by commas. | | jdbc-url | -- | N | JDBC connection information, such as jdbc:mysql://127.0.0.1:9030. | | table.identifier | -- | Y | Doris table name, such as db.tbl. | | username | -- | Y | Username for accessing Doris. | @@ -763,10 +814,8 @@ After starting the Flink cluster, you can directly run the following command: | ----------------------------- | ------------- | -------- | ------------------------------------------------------------ | | doris.request.query.timeout | 21600s | N | The timeout for querying Doris. The default value is 6 hours. | | doris.request.tablet.size | 1 | N | The number of Doris Tablets corresponding to one Partition. The smaller this value is set, the more Partitions will be generated, which can increase the parallelism on the Flink side. However, it will also put more pressure on Doris. | -| doris.batch.size | 1024 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | +| doris.batch.size | 4064 | N | The maximum number of rows read from BE at one time. Increasing this value can reduce the number of connections established between Flink and Doris, thereby reducing the additional time overhead caused by network latency. | | doris.exec.mem.limit | 8192mb | N | The memory limit for a single query. The default is 8GB, in bytes. | -| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to the RowBatch required by the flink-doris-connector iteration. | -| doris.deserialize.queue.size | 64 | N | The internal processing queue for asynchronous conversion of the Arrow format. It takes effect when doris.deserialize.arrow.async is set to true. | | source.use-flight-sql | FALSE | N | Whether to use Arrow Flight SQL for reading. | | source.flight-sql-port | - | N | The arrow_flight_sql_port of FE when using Arrow Flight SQL for reading. | @@ -848,9 +897,7 @@ After starting the Flink cluster, you can directly run the following command: | --sink-conf | All the configurations of the Doris Sink can be viewed [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#General Configuration Items). | | --mongodb-conf | The configuration of the MongoDB CDCSource, for example, --mongodb-conf hosts=127.0.0.1:27017. You can view all the configurations of Mongo-CDC [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/). Among them, hosts, username, password, and database are required. --mongodb-conf schema.sample-percent is the configuration for automatically sampling MongoDB data to create tables in Doris, and the default [...] | --table-conf | The configuration items of the Doris table, that is, the content included in properties (except for table-buckets, which is not a properties attribute). For example, --table-conf replication_num=1, and --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50" means specifying the number of buckets for different tables in the order of regular expressions. If there is no match, the BUCKETS AUTO method will be used to create tables. | -| --ignore-default-value | Disable the default values of the MySQL table structure when synchronizing. It is applicable to the situation where there are default values for fields when synchronizing MySQL data to Doris, but the actual inserted data is null. Refer to [#152](https://github.com/apache/doris-flink-connector/pull/152). | -| --use-new-schema-change | Whether to use the new schema change, which supports multi-column changes and default values in MySQL synchronization. Since version 1.6.0, this parameter is set to true by default. Refer to [#167](https://github.com/apache/doris-flink-connector/pull/167). | -| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] +| --schema-change-mode | The modes for parsing schema change, including debezium_structure and sql_parser. The debezium_structure mode is used by default. The debezium_structure mode parses the data structure used when the upstream CDC synchronizes data and judges DDL change operations by parsing this structure. The sql_parser mode parses the DDL statements when the upstream CDC synchronizes data to judge DDL change operations, so this parsing mode is more accurate. Usage example: --s [...] | --single-sink | Whether to use a single Sink to synchronize all tables. After enabling, it can also automatically identify newly created tables upstream and create tables automatically. | | --multi-to-one-origin | The configuration of the source tables when multiple upstream tables are written to the same table, for example: --multi-to-one-origin "a\_.\*\|b_.\*", refer to [#208](https://github.com/apache/doris-flink-connector/pull/208) | | --multi-to-one-target | Used in combination with multi-to-one-origin, the configuration of the target table, for example: --multi-to-one-target "a\|b" | @@ -1040,6 +1087,13 @@ if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE; ``` +### Flink CDC Synchronize DDL Statements +Generally, when synchronizing upstream data sources such as MySQL, when adding or deleting fields in the upstream, you need to synchronize the Schema Change operation in Doris. + +For this scenario, you usually need to write a program for the DataStream API and use the JsonDebeziumSchemaSerializer serializer provided by DorisSink to automatically perform SchemaChange. For details, please refer to [CDCSchemaChangeExample.java](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CDCSchemaChangeExample.java) + +In the whole database synchronization tool provided by the Connector, no additional configuration is required, and the upstream DDL will be automatically synchronized and the SchemaChange operation will be performed in Doris. + ## Frequently Asked Questions (FAQ) 1. **errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org