This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new 885e862e7b Add a guide related to Mongo CDC to the flink-doris-connector doc (#974) 885e862e7b is described below commit 885e862e7b9fb57fbce2ededec2b8a5a191ea10b Author: bingquanzhao <bingquan_z...@icloud.com> AuthorDate: Wed Aug 21 14:49:51 2024 +0800 Add a guide related to Mongo CDC to the flink-doris-connector doc (#974) Add a guide related to Mongo CDC to the flink-doris-connector documentation. --- common_docs_zh/ecosystem/flink-doris-connector.md | 50 +++++++++++++++---- ecosystem/flink-doris-connector.md | 60 +++++++++++++++++------ 2 files changed, 86 insertions(+), 24 deletions(-) diff --git a/common_docs_zh/ecosystem/flink-doris-connector.md b/common_docs_zh/ecosystem/flink-doris-connector.md index 838ea76b5b..4fa1e3838c 100644 --- a/common_docs_zh/ecosystem/flink-doris-connector.md +++ b/common_docs_zh/ecosystem/flink-doris-connector.md @@ -509,13 +509,13 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; ``` -## 使用 FlinkCDC 接入多表或整库 (支持 MySQL,Oracle,PostgreSQL,SQLServer) +## 使用 Flink CDC 接入多表或整库 (支持 MySQL,Oracle,PostgreSQL,SQLServer,MongoDB) ### 语法 ```shell <FLINK_HOME>bin/flink run \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \ - <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \ + <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \ --database <doris-database-name> \ [--job-name <flink-job-name>] \ [--table-prefix <doris-table-prefix>] \ @@ -546,16 +546,19 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; | --sqlserver-conf | SQLServer CDCSource 配置,例如--sqlserver-conf hostname=127.0.0.1,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/sqlserver-cdc/)查看所有配置 SQLServer-CDC,其中 hostname/username/password/database-name/schema-name 是必需的。 | | --db2-conf | SQLServer CDCSource 配置,例如--db2-conf hostname=127.0.0.1,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/db2-cdc/)查看所有配置 DB2-CDC,其中 hostname/username/password/database-name/schema-name 是必需的。| | --sink-conf | Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。 | +| --mongodb-conf | MongoDB CDCSource 配置,例如 --mongodb-conf hosts=127.0.0.1:27017,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/)查看所有配置 Mongo-CDC,其中 hosts/username/password/database 是必须的。其中 --mongodb-conf schema.sample-percent 为自动采样mongodb 数据为Doris建表的配置,默认为0.2| | --table-conf | Doris 表的配置项,即 properties 中包含的内容(其中 table-buckets 例外,非 properties 属性)。例如 `--table-conf replication_num=1`,而 `--table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"`表示按照正则表达式顺序指定不同表的 buckets 数量,如果没有匹配到则采用 BUCKETS AUTO 建表。 | -| --ignore-default-value | 关闭同步 mysql 表结构的默认值。适用于同步 mysql 数据到 doris 时,字段有默认值,但实际插入数据为 null 情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) | -| --use-new-schema-change | 是否使用新的 schema change,支持同步 mysql 多列变更、默认值,1.6.0 开始该参数默认为true。参考[#167](https://github.com/apache/doris-flink-connector/pull/167) | +| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 Doris 时,字段有默认值,但实际插入数据为 null 情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) | +| --use-new-schema-change | 是否使用新的 schema change,支持同步 MySQL 多列变更、默认值,1.6.0 开始该参数默认为true。参考[#167](https://github.com/apache/doris-flink-connector/pull/167) | | --schema-change-mode | 解析 schema change 的模式,支持 `debezium_structure`、`sql_parser` 两种解析模式,默认采用 `debezium_structure` 模式。<br/><br/> `debezium_structure` 解析上游 CDC 同步数据时所使用的数据结构,通过解析该结构判断 DDL 变更操作。 <br/> `sql_parser` 通过解析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该解析模式更加准确。<br/> 使用例子:`--schema-change-mode debezium_structure`<br/> 本功能将在 1.6.2.1 后的版本中提供 | | --single-sink | 是否使用单个 Sink 同步所有表,开启后也可自动识别上游新创建的表,自动创建表。 | | --multi-to-one-origin | 将上游多张表写入同一张表时,源表的配置,比如:--multi-to-one-origin="a\_.\*\|b_.\*",具体参考[#208](https://github.com/apache/doris-flink-connector/pull/208) | | --multi-to-one-target | 与 multi-to-one-origin 搭配使用,目标表的配置,比如:--multi-to-one-target="a\|b" | | --create-table-only | 是否只仅仅同步表的结构 | ->注:同步时需要在$FLINK_HOME/lib 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar +:::info 备注 +同步时需要在$FLINK_HOME/lib 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar ,flink-sql-connector-mongodb-cdc-${version}.jar +::: ### MySQL 多表同步示例 ```shell @@ -684,13 +687,40 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; --table-conf replication_num=1 ``` -## 使用 FlinkCDC 更新 Key 列 +### MongoDB 多表同步示例 + +```shell +<FLINK_HOME>/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.18-1.6.2-SNAPSHOT.jar \ + mongodb-sync-database \ + --database doris_db \ + --schema-change-mode debezium_structure \ + --mongodb-conf hosts=127.0.0.1:27017 \ + --mongodb-conf username=flinkuser \ + --mongodb-conf password=flinkpwd \ + --mongodb-conf database=test \ + --mongodb-conf scan.startup.mode=initial \ + --mongodb-conf schema.sample-percent=0.2 \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --sink-conf sink.enable-2pc=false \ + --table-conf replication_num=1 +``` + +## 使用 Flink CDC 更新 Key 列 一般在业务数据库中,会使用编号来作为表的主键,比如 Student 表,会使用编号 (id) 来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。 -在这种场景下,使用 FlinkCDC + Doris Connector 同步数据,便可以自动更新 Doris 主键列的数据。 +在这种场景下,使用 Flink CDC + Doris Connector 同步数据,便可以自动更新 Doris 主键列的数据。 ### 原理 Flink CDC 底层的采集工具是 Debezium,Debezium 内部使用 op 字段来标识对应的操作:op 字段的取值分别为 c、u、d、r,分别对应 create、update、delete 和 read。 -而对于主键列的更新,FlinkCDC 会向下游发送 DELETE 和 INSERT 事件,同时数据同步到 Doris 中后,就会自动更新主键列的数据。 +而对于主键列的更新,Flink CDC 会向下游发送 DELETE 和 INSERT 事件,同时数据同步到 Doris 中后,就会自动更新主键列的数据。 ### 使用 Flink 程序可参考上面 CDC 同步的示例,成功提交任务后,在 MySQL 侧执行 Update 主键列的语句 (`update student set id = '1002' where id = '1001'`),即可修改 Doris 中的数据。 @@ -742,7 +772,7 @@ from KAFKA_SOURCE; ### 应用场景 -使用 Flink Doris Connector 最适合的场景就是实时/批次同步源数据(Mysql,Oracle,PostgreSQL 等)到 Doris,使用 Flink 对 Doris 中的数据和其他数据源进行联合分析,也可以使用 Flink Doris Connector。 +使用 Flink Doris Connector 最适合的场景就是实时/批次同步源数据(MySQL,Oracle,PostgreSQL 等)到 Doris,使用 Flink 对 Doris 中的数据和其他数据源进行联合分析,也可以使用 Flink Doris Connector。 ### 其他 @@ -810,7 +840,7 @@ Connector1.1.0 版本以前,是攒批写入的,写入均是由数据驱动 Flink 在数据导入时,如果有脏数据,比如字段格式、长度等问题,会导致 StreamLoad 报错,此时 Flink 会不断的重试。如果需要跳过,可以通过禁用 StreamLoad 的严格模式 (strict_mode=false,max_filter_ratio=1) 或者在 Sink 算子之前对数据做过滤。 11. **源表和 Doris 表应如何对应?** -使用 Flink Connector 导入数据时,要注意两个方面,第一是源表的列和类型跟 flink sql 中的列和类型要对应上;第二个是 flink sql 中的列和类型要跟 doris 表的列和类型对应上,具体可以参考上面的"Doris 和 Flink 列类型映射关系" +使用 Flink Connector 导入数据时,要注意两个方面,第一是源表的列和类型跟 flink sql 中的列和类型要对应上;第二个是 flink sql 中的列和类型要跟 Doris 表的列和类型对应上,具体可以参考上面的"Doris 和 Flink 列类型映射关系" 12. **TApplicationException: get_next failed: out of sequence response: expected 4 but got 3** diff --git a/ecosystem/flink-doris-connector.md b/ecosystem/flink-doris-connector.md index de7cf9f3a1..46d4d361a0 100644 --- a/ecosystem/flink-doris-connector.md +++ b/ecosystem/flink-doris-connector.md @@ -77,7 +77,7 @@ Copy this file to `classpath` of `Flink` to use `Flink-Doris-Connector`. For exa ## Instructions -### read +### Read **SQL** @@ -116,7 +116,7 @@ DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder() env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); ``` -### write +### Write **SQL** @@ -307,7 +307,7 @@ LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c ON a.city = c.city ``` -## configuration +## Configuration ### General configuration items @@ -502,16 +502,16 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; ``` -## Use FlinkCDC to access multiple tables or the entire database (Supports MySQL, Oracle, PostgreSQL, SQLServer) +## Use Flink CDC to access multiple tables or the entire database (Supports MySQL, Oracle, PostgreSQL, SQLServer, MongoDB) -### grammar +### Grammar ```shell <FLINK_HOME>bin/flink run \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\ - <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \ + <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \ --database <doris-database-name> \ [--job-name <flink-job-name>] \ [--table-prefix <doris-table-prefix>] \ @@ -538,10 +538,14 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; | --oracle-conf | Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/) View all configurations Oracle-CDC, where hostname/username/password/database-name/schema-name is required. | | --postgres-conf | Postgres CDCSource configuration, e.g. --postgres-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/) View all configurations Postgres-CDC where hostname/username/password/database-name/schema-name/slot.name is required. | | --sqlserver-conf | SQLServer CDCSource configuration, for example --sqlserver-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/sqlserver-cdc/) View all configurations SQLServer-CDC, where hostname/username/password/database-name/schema-name is required. | +<<<<<<< HEAD | --db2-conf | DB2 CDCSource configuration, for example --db2-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/db2-cdc/) View all configurations DB2-CDC, where hostname/username/password/database-name/schema-name is required. | +======= +| --mongodb-conf | MongoDB CDCSource configuration, for example --mongodb-conf hosts=127.0.0.1:27017, you can find all Mongo-CDC configurations [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/), where hosts/username/password/database are required. The --mongodb-conf schema.sample-percent configuration is for automatically sampling MongoDB data for creating a table in Doris, with a default value of 0.2. | +>>>>>>> 59a26707 (Add a guide related to Mongo CDC to the flink-doris-connector documentation.) | --sink-conf | All configurations of Doris Sink can be found [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9) View the complete configuration items. | | --table-conf | The configuration items of the Doris table(The exception is table-buckets, non-properties attributes), that is, the content contained in properties. For example `--table-conf replication_num=1`, and the `--table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"` option specifies the number of buckets for different tables based on the order of regular expressions. If there is no match, the table is created with the default setting of BUCKETS AUTO. | -| --ignore-default-value | Turn off the default value of synchronizing mysql table structure. It is suitable for synchronizing mysql data to doris when the field has a default value but the actual inserted data is null. Reference [here](https://github.com/apache/doris-flink-connector/pull/152) | +| --ignore-default-value | Turn off the default value of synchronizing MySQL table structure. It is suitable for synchronizing MySQL data to Doris when the field has a default value but the actual inserted data is null. Reference [here](https://github.com/apache/doris-flink-connector/pull/152) | | --use-new-schema-change | Whether to use the new schema change to support synchronization of MySQL multi-column changes and default values. since version 1.6.0, the default value has been set to true. Reference [here](https://github.com/apache/doris-flink-connector/pull/167) | | --schema-change-mode | The mode for parsing schema change supports two parsing modes: `debezium_structure` and `sql_parser`. The default mode is `debezium_structure`. <br/><br/> `debezium_structure` parses the data structure used when upstream CDC synchronizes data, and determines DDL change operations by parsing this structure. <br/> `sql_parser` determines the DDL change operation by parsing the DDL statement when the upstream CDC synchronizes data, so this parsing mode is more ac [...] | --single-sink | Whether to use a single Sink to synchronize all tables. When turned on, newly created tables in the upstream can also be automatically recognized and tables automatically created. | @@ -549,7 +553,9 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; | --multi-to-one-target | Used with multi-to-one-origin, the configuration of the target table, such as: --multi-to-one-target="a\|b" | | --create-table-only | Whether only the table schema should be synchronized | ->Note: When synchronizing, you need to add the corresponding Flink CDC dependencies in the $FLINK_HOME/lib directory, such as flink-sql-connector-mysql-cdc-${version}.jar, flink-sql-connector-oracle-cdc-${version}.jar +:::info Note +When synchronizing, you need to add the corresponding Flink CDC dependencies in the $FLINK_HOME/lib directory, such as flink-sql-connector-mysql-cdc-${version}.jar, flink-sql-connector-oracle-cdc-${version}.jar , flink-sql-connector-mongodb-cdc-${version}.jar +::: ### MySQL synchronization example @@ -677,16 +683,42 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source; --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ --sink-conf sink.label-prefix=label \ --table-conf replication_num=1 - ``` -## Use FlinkCDC to update Key column +### MongoDB synchronization example + +```shell +<FLINK_HOME>/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.18-1.6.2-SNAPSHOT.jar \ + mongodb-sync-database \ + --database doris_db \ + --schema-change-mode debezium_structure \ + --mongodb-conf hosts=127.0.0.1:27017 \ + --mongodb-conf username=flinkuser \ + --mongodb-conf password=flinkpwd \ + --mongodb-conf database=test \ + --mongodb-conf scan.startup.mode=initial \ + --mongodb-conf schema.sample-percent=0.2 \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password= \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --sink-conf sink.enable-2pc=false \ + --table-conf replication_num=1 +``` + +## Use Flink CDC to update Key column Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change. -In this scenario, using FlinkCDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column. +In this scenario, using Flink CDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column. ### Principle The underlying collection tool of Flink CDC is Debezium. Debezium internally uses the op field to identify the corresponding operation: the values of the op field are c, u, d, and r, corresponding to create, update, delete, and read. -For the update of the primary key column, FlinkCDC will send DELETE and INSERT events downstream, and after the data is synchronized to Doris, it will automatically update the data of the primary key column. +For the update of the primary key column, Flink CDC will send DELETE and INSERT events downstream, and after the data is synchronized to Doris, it will automatically update the data of the primary key column. ### Example The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (`update student set id = '1002' where id = '1001'`) on the MySQL side to modify the data in Doris . @@ -738,7 +770,7 @@ from KAFKA_SOURCE; ### Application scenarios -The most suitable scenario for using Flink Doris Connector is to synchronize source data to Doris (Mysql, Oracle, PostgreSQL) in real time/batch, etc., and use Flink to perform joint analysis on data in Doris and other data sources. You can also use Flink Doris Connector +The most suitable scenario for using Flink Doris Connector is to synchronize source data to Doris (MySQL, Oracle, PostgreSQL) in real time/batch, etc., and use Flink to perform joint analysis on data in Doris and other data sources. You can also use Flink Doris Connector ### Other @@ -806,7 +838,7 @@ It usually occurs before Connector1.1.0, because the writing frequency is too fa When Flink imports data, if there is dirty data, such as field format, length, etc., it will cause StreamLoad to report an error, and Flink will continue to retry at this time. If you need to skip, you can disable the strict mode of StreamLoad (strict_mode=false, max_filter_ratio=1) or filter the data before the Sink operator. 11. **How should the source table and Doris table correspond?** -When using Flink Connector to import data, pay attention to two aspects. The first is that the columns and types of the source table correspond to the columns and types in flink sql; the second is that the columns and types in flink sql must match those of the doris table For the correspondence between columns and types, please refer to the above "Doris & Flink Column Type Mapping" for details +When using Flink Connector to import data, pay attention to two aspects. The first is that the columns and types of the source table correspond to the columns and types in flink sql; the second is that the columns and types in flink sql must match those of the Doris table For the correspondence between columns and types, please refer to the above "Doris & Flink Column Type Mapping" for details 12. **TApplicationException: get_next failed: out of sequence response: expected 4 but got 3** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org