This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new 1faa64dad72 [doc](import) add tp load to datasource (#1882) 1faa64dad72 is described below commit 1faa64dad72ae9d8f4b1587a819b962e0b3bb714 Author: wudi <w...@selectdb.com> AuthorDate: Thu Jan 23 14:16:17 2025 +0800 [doc](import) add tp load to datasource (#1882) ## Versions - [x] dev - [x] 3.0 - [x] 2.1 - [] 2.0 ## Languages - [x] Chinese - [x] English ## Docs Checklist - [ ] Checked by AI - [ ] Test Cases Built --- .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ sidebars.json | 3 +- .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ .../data-source/migrate-data-from-other-oltp.md | 182 +++++++++++++++++++++ versioned_sidebars/version-2.1-sidebars.json | 3 +- versioned_sidebars/version-3.0-sidebars.json | 3 +- 9 files changed, 1098 insertions(+), 3 deletions(-) diff --git a/docs/data-operate/import/data-source/migrate-data-from-other-oltp.md b/docs/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..6f571b8dda9 --- /dev/null +++ b/docs/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "Migrating Data from Other OLTP", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +There are various ways to migrate data from other TP systems, such as MySQL/SqlServer/Oracle, to Doris. + +## Multi-Catalog + +Use the Catalog to map as an external table, and then use the INSERT INTO or CREATE-TABLE-AS-SELECT statements to complete the data load. + +For example, with MySQL: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- Load via INSERT +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- Load via CTAS +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +For more details, refer to [Catalog Data Load](../../../lakehouse/catalog-overview.md#data-import)。 + +## Flink Doris Connector + +You can leverage Flink to achieve offline and real-time synchronization for TP systems. + +- Offline synchronization can be done using Flink's JDBC Source and Doris Sink to complete the data load. For example, using FlinkSQL: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + For more details, refer to [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- Real-time synchronization can be achieved using FlinkCDC to read both full and incremental data. For example, using FlinkSQL: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- Supports synchronization of insert/update/delete events. + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- Synchronize delete events. + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + For synchronizing an entire database or multiple tables in a TP database, you can use the full-database synchronization feature provided by the Flink Doris Connector to complete the TP database write with a single click, as shown below: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + For more details, refer to [Full Database Synchronization](../../../ecosystem/flink-doris-connector.md#full-database-synchronization) + +## Spark Connector +You can use the JDBC Source and Doris Sink of the Spark Connector to complete the data write. +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +For more details, refer to [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#batch-write) + +## DataX / Seatunnel / CloudCanal And other third-party tools. + +In addition, you can also use third-party synchronization tools for data synchronization. For more details, please refer to: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/data-source/migrate-data-from-other-oltp.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..8106a332a90 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "从其他 TP 系统迁移数据", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +从其他 TP 系统,如 MySQL/SqlServer/Oracle 等,迁移数据到 Doris,可以有多种方式。 + +## Multi-Catalog + +使用 Catalog 映射为外表,然后使用 INSERT INTO 或者 CREATE-TABLE-AS-SELECT 语句,完成数据导入。 + +以 MySQL 为例: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- 通过 insert 导入 +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- 通过 ctas 导入 +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +具体可参考 [Catalog 数据导入](../../../lakehouse/catalog-overview.md#数据导入)。 + +## Flink Doris Connector + +可以借助于 Flink 完成 TP 系统的离线和实时同步。 + +- 离线同步可以使用 Flink 的 JDBC Source 和 Doris Sink 完成数据的导入,以 FlinkSQL 为例: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + 具体可参考 [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- 实时同步可以借助 FlinkCDC,完成全量和增量数据的读取,以 FlinkSQL 为例: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- 支持同步 insert/update/delete 事件 + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- 同步删除事件 + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + 同时对于 TP 数据库中 整库或者多表的同步操作,可以使用 Flink Doris Connector 提供的整库同步功能,一键完成 TP 数据库的写入,如: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + 具体可参考:[整库同步](../../../ecosystem/flink-doris-connector.md#整库同步) + +## Spark Connector +可以通过 Spark Connector 的 JDBC Source 和 Doris Sink 完成数据的写入。 +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +具体可参考:[JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#批量写入) + +## DataX / Seatunnel / CloudCanal 等三方工具 + +除此之外,也可以使用第三方同步工具来进行数据同步,更多可参考: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..8106a332a90 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "从其他 TP 系统迁移数据", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +从其他 TP 系统,如 MySQL/SqlServer/Oracle 等,迁移数据到 Doris,可以有多种方式。 + +## Multi-Catalog + +使用 Catalog 映射为外表,然后使用 INSERT INTO 或者 CREATE-TABLE-AS-SELECT 语句,完成数据导入。 + +以 MySQL 为例: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- 通过 insert 导入 +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- 通过 ctas 导入 +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +具体可参考 [Catalog 数据导入](../../../lakehouse/catalog-overview.md#数据导入)。 + +## Flink Doris Connector + +可以借助于 Flink 完成 TP 系统的离线和实时同步。 + +- 离线同步可以使用 Flink 的 JDBC Source 和 Doris Sink 完成数据的导入,以 FlinkSQL 为例: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + 具体可参考 [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- 实时同步可以借助 FlinkCDC,完成全量和增量数据的读取,以 FlinkSQL 为例: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- 支持同步 insert/update/delete 事件 + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- 同步删除事件 + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + 同时对于 TP 数据库中 整库或者多表的同步操作,可以使用 Flink Doris Connector 提供的整库同步功能,一键完成 TP 数据库的写入,如: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + 具体可参考:[整库同步](../../../ecosystem/flink-doris-connector.md#整库同步) + +## Spark Connector +可以通过 Spark Connector 的 JDBC Source 和 Doris Sink 完成数据的写入。 +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +具体可参考:[JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#批量写入) + +## DataX / Seatunnel / CloudCanal 等三方工具 + +除此之外,也可以使用第三方同步工具来进行数据同步,更多可参考: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..8106a332a90 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "从其他 TP 系统迁移数据", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +从其他 TP 系统,如 MySQL/SqlServer/Oracle 等,迁移数据到 Doris,可以有多种方式。 + +## Multi-Catalog + +使用 Catalog 映射为外表,然后使用 INSERT INTO 或者 CREATE-TABLE-AS-SELECT 语句,完成数据导入。 + +以 MySQL 为例: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- 通过 insert 导入 +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- 通过 ctas 导入 +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +具体可参考 [Catalog 数据导入](../../../lakehouse/catalog-overview.md#数据导入)。 + +## Flink Doris Connector + +可以借助于 Flink 完成 TP 系统的离线和实时同步。 + +- 离线同步可以使用 Flink 的 JDBC Source 和 Doris Sink 完成数据的导入,以 FlinkSQL 为例: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + 具体可参考 [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- 实时同步可以借助 FlinkCDC,完成全量和增量数据的读取,以 FlinkSQL 为例: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- 支持同步 insert/update/delete 事件 + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- 同步删除事件 + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + 同时对于 TP 数据库中 整库或者多表的同步操作,可以使用 Flink Doris Connector 提供的整库同步功能,一键完成 TP 数据库的写入,如: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + 具体可参考:[整库同步](../../../ecosystem/flink-doris-connector.md#整库同步) + +## Spark Connector +可以通过 Spark Connector 的 JDBC Source 和 Doris Sink 完成数据的写入。 +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +具体可参考:[JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#批量写入) + +## DataX / Seatunnel / CloudCanal 等三方工具 + +除此之外,也可以使用第三方同步工具来进行数据同步,更多可参考: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/sidebars.json b/sidebars.json index 34fddebade6..1ebce911ccf 100644 --- a/sidebars.json +++ b/sidebars.json @@ -158,7 +158,8 @@ "data-operate/import/data-source/tencent-cos", "data-operate/import/data-source/minio", "data-operate/import/data-source/s3-compatible", - "data-operate/import/data-source/migrate-data-from-other-olap" + "data-operate/import/data-source/migrate-data-from-other-olap", + "data-operate/import/data-source/migrate-data-from-other-oltp" ] }, { diff --git a/versioned_docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md b/versioned_docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..6f571b8dda9 --- /dev/null +++ b/versioned_docs/version-2.1/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "Migrating Data from Other OLTP", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +There are various ways to migrate data from other TP systems, such as MySQL/SqlServer/Oracle, to Doris. + +## Multi-Catalog + +Use the Catalog to map as an external table, and then use the INSERT INTO or CREATE-TABLE-AS-SELECT statements to complete the data load. + +For example, with MySQL: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- Load via INSERT +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- Load via CTAS +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +For more details, refer to [Catalog Data Load](../../../lakehouse/catalog-overview.md#data-import)。 + +## Flink Doris Connector + +You can leverage Flink to achieve offline and real-time synchronization for TP systems. + +- Offline synchronization can be done using Flink's JDBC Source and Doris Sink to complete the data load. For example, using FlinkSQL: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + For more details, refer to [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- Real-time synchronization can be achieved using FlinkCDC to read both full and incremental data. For example, using FlinkSQL: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- Supports synchronization of insert/update/delete events. + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- Synchronize delete events. + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + For synchronizing an entire database or multiple tables in a TP database, you can use the full-database synchronization feature provided by the Flink Doris Connector to complete the TP database write with a single click, as shown below: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + For more details, refer to [Full Database Synchronization](../../../ecosystem/flink-doris-connector.md#full-database-synchronization) + +## Spark Connector +You can use the JDBC Source and Doris Sink of the Spark Connector to complete the data write. +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +For more details, refer to [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#batch-write) + +## DataX / Seatunnel / CloudCanal And other third-party tools. + +In addition, you can also use third-party synchronization tools for data synchronization. For more details, please refer to: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/versioned_docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md b/versioned_docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md new file mode 100644 index 00000000000..6f571b8dda9 --- /dev/null +++ b/versioned_docs/version-3.0/data-operate/import/data-source/migrate-data-from-other-oltp.md @@ -0,0 +1,182 @@ +--- +{ + "title": "Migrating Data from Other OLTP", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +There are various ways to migrate data from other TP systems, such as MySQL/SqlServer/Oracle, to Doris. + +## Multi-Catalog + +Use the Catalog to map as an external table, and then use the INSERT INTO or CREATE-TABLE-AS-SELECT statements to complete the data load. + +For example, with MySQL: +```sql +CREATE CATALOG mysql_catalog properties( + 'type' = 'jdbc', + 'user' = 'root', + 'password' = '123456', + 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db', + 'driver_url' = 'mysql-connector-java-8.0.25.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); + +-- Load via INSERT +INSERT INTO internal.doris_db.tbl1 +SELECT * FROM iceberg_catalog.iceberg_db.table1; + +-- Load via CTAS +CREATE TABLE internal.doris_db.tbl1 +PROPERTIES('replication_num' = '1') +AS +SELECT * FROM iceberg_catalog.iceberg_db.table1; +``` + +For more details, refer to [Catalog Data Load](../../../lakehouse/catalog-overview.md#data-import)。 + +## Flink Doris Connector + +You can leverage Flink to achieve offline and real-time synchronization for TP systems. + +- Offline synchronization can be done using Flink's JDBC Source and Doris Sink to complete the data load. For example, using FlinkSQL: + ```sql + CREATE TABLE student_source ( + id INT, + name STRING, + age INT + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://localhost:3306/mydatabase', + 'table-name' = 'students', + 'username' = 'username', + 'password' = 'password', + ); + + CREATE TABLE student_sink ( + id INT, + name STRING, + age INT + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'test.students', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' + ); + + INSERT into student_sink select * from student_source; + ``` + For more details, refer to [Flink JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba-jdbc-%e8%a1%a8)。 + +- Real-time synchronization can be achieved using FlinkCDC to read both full and incremental data. For example, using FlinkSQL: + ```sql + SET 'execution.checkpointing.interval' = '10s'; + + CREATE TABLE cdc_mysql_source ( + id int + ,name VARCHAR + ,PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'database', + 'table-name' = 'table' + ); + + -- Supports synchronization of insert/update/delete events. + CREATE TABLE doris_sink ( + id INT, + name STRING + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = '', + 'sink.properties.format' = 'json', + 'sink.properties.read_json_by_line' = 'true', + 'sink.enable-delete' = 'true', -- Synchronize delete events. + 'sink.label-prefix' = 'doris_label' + ); + + insert into doris_sink select id,name from cdc_mysql_source; + ``` + + For synchronizing an entire database or multiple tables in a TP database, you can use the full-database synchronization feature provided by the Flink Doris Connector to complete the TP database write with a single click, as shown below: + ```shell + <FLINK_HOME>bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-24.0.1.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name=mysql_db \ + --including-tables "tbl1|test.*" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=123456 \ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 + ``` + For more details, refer to [Full Database Synchronization](../../../ecosystem/flink-doris-connector.md#full-database-synchronization) + +## Spark Connector +You can use the JDBC Source and Doris Sink of the Spark Connector to complete the data write. +```java +val jdbcDF = spark.read + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load() + + jdbcDF.write.format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "127.0.0.1:8030") + .option("user", "root") + .option("password", "") + .save() +``` +For more details, refer to [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html),[Spark Doris Connector](../../../ecosystem//spark-doris-connector.md#batch-write) + +## DataX / Seatunnel / CloudCanal And other third-party tools. + +In addition, you can also use third-party synchronization tools for data synchronization. For more details, please refer to: +- [DataX](../../../ecosystem/datax.md) +- [Seatunnel](../../../ecosystem/seatunnel.md) +- [CloudCanal](../../../ecosystem/cloudcanal.md) \ No newline at end of file diff --git a/versioned_sidebars/version-2.1-sidebars.json b/versioned_sidebars/version-2.1-sidebars.json index 7fd5eafe16b..e62925354ba 100644 --- a/versioned_sidebars/version-2.1-sidebars.json +++ b/versioned_sidebars/version-2.1-sidebars.json @@ -134,7 +134,8 @@ "data-operate/import/data-source/tencent-cos", "data-operate/import/data-source/minio", "data-operate/import/data-source/s3-compatible", - "data-operate/import/data-source/migrate-data-from-other-olap" + "data-operate/import/data-source/migrate-data-from-other-olap", + "data-operate/import/data-source/migrate-data-from-other-oltp" ] }, { diff --git a/versioned_sidebars/version-3.0-sidebars.json b/versioned_sidebars/version-3.0-sidebars.json index c4a05463709..039644303be 100644 --- a/versioned_sidebars/version-3.0-sidebars.json +++ b/versioned_sidebars/version-3.0-sidebars.json @@ -158,7 +158,8 @@ "data-operate/import/data-source/tencent-cos", "data-operate/import/data-source/minio", "data-operate/import/data-source/s3-compatible", - "data-operate/import/data-source/migrate-data-from-other-olap" + "data-operate/import/data-source/migrate-data-from-other-olap", + "data-operate/import/data-source/migrate-data-from-other-oltp" ] }, { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org