This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9a9905b [Feature]add mysql database sync (#141) 9a9905b is described below commit 9a9905bbaad25f982f9b08c370ba45118299aa57 Author: wudi <676366...@qq.com> AuthorDate: Mon Jun 5 11:08:15 2023 +0800 [Feature]add mysql database sync (#141) --- flink-doris-connector/build.sh | 9 +- flink-doris-connector/pom.xml | 32 ++- .../apache/doris/flink/catalog/DorisCatalog.java | 3 + .../doris/flink/catalog/doris/DataModel.java | 23 +++ .../doris/flink/catalog/doris/DorisSystem.java | 230 +++++++++++++++++++++ .../doris/flink/catalog/doris/DorisType.java | 42 ++++ .../doris/flink/catalog/doris/FieldSchema.java | 56 +++++ .../doris/flink/catalog/doris/TableSchema.java | 97 +++++++++ .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../flink/exception/CreateTableException.java | 45 ++++ .../doris/flink/sink/writer/DorisWriter.java | 11 +- .../doris/flink/table/DorisConfigOptions.java | 14 ++ .../org/apache/doris/flink/tools/cdc/CdcTools.java | 101 +++++++++ .../apache/doris/flink/tools/cdc/DatabaseSync.java | 217 +++++++++++++++++++ .../flink/tools/cdc}/DateToStringConverter.java | 10 +- .../apache/doris/flink/tools/cdc/SourceSchema.java | 104 ++++++++++ .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 192 +++++++++++++++++ .../doris/flink/tools/cdc/mysql/MysqlType.java | 168 +++++++++++++++ .../tools/cdc/mysql/ParsingProcessFunction.java | 65 ++++++ .../apache/doris/flink/CDCSchemaChangeExample.java | 5 +- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 73 +++++++ .../doris/flink/utils/DateToStringConverter.java | 2 +- 22 files changed, 1466 insertions(+), 35 deletions(-) diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh index 9dbe69d..5816e86 100755 --- a/flink-doris-connector/build.sh +++ b/flink-doris-connector/build.sh @@ -147,10 +147,15 @@ elif [ ${flinkVer} -eq 3 ]; then FLINK_VERSION="1.17.0" fi -echo_g " flink version: ${FLINK_VERSION}" +# extract minor version: +# eg: 3.1.2 -> 3 +FLINK_MINOR_VERSION=0 +[ ${FLINK_VERSION} != 0 ] && FLINK_MINOR_VERSION=${FLINK_VERSION%.*} + +echo_g " flink version: ${FLINK_VERSION}, minor version: ${FLINK_MINOR_VERSION}" echo_g " build starting..." -${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} "$@" +${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.minor.version=${FLINK_MINOR_VERSION} "$@" EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 0d0e6c4..025bcde 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -26,7 +26,7 @@ under the License. <version>23</version> </parent> <groupId>org.apache.doris</groupId> - <artifactId>flink-doris-connector</artifactId> + <artifactId>flink-doris-connector-${flink.minor.version}</artifactId> <version>1.4.0-SNAPSHOT</version> <name>Flink Doris Connector</name> <url>https://doris.apache.org/</url> @@ -68,6 +68,7 @@ under the License. <properties> <flink.version>1.15.0</flink.version> + <flink.minor.version>1.15</flink.minor.version> <libthrift.version>0.16.0</libthrift.version> <arrow.version>5.0.0</arrow.version> <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version> @@ -237,14 +238,20 @@ under the License. <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> - <scope>test</scope> + <scope>provided</scope> </dependency> - + <!-- use cdc bundled jar for kafka connect class--> <dependency> <groupId>com.ververica</groupId> - <artifactId>flink-connector-mysql-cdc</artifactId> + <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.3.0</version> - <scope>test</scope> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> </dependency> </dependencies> @@ -393,19 +400,6 @@ under the License. </build> <profiles> - - <profile> - <id>thirdparty</id> - <activation> - <property> - <name>env.DORIS_THIRDPARTY</name> - </property> - </activation> - <properties> - <doris.thirdparty>${env.DORIS_THIRDPARTY}</doris.thirdparty> - </properties> - </profile> - <!-- for custom internal repository --> <profile> <id>custom-env</id> @@ -478,4 +472,4 @@ under the License. </profile> </profiles> -</project> \ No newline at end of file +</project> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index 3b5834f..ef735e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -81,6 +81,9 @@ import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.util.Preconditions.checkArgument; +/** + * catalog for flink + */ public class DorisCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java new file mode 100644 index 0000000..03b87a0 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog.doris; + +public enum DataModel { + DUPLICATE, + UNIQUE, + AGGREGATE +} \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java new file mode 100644 index 0000000..36aba1c --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog.doris; + + +import org.apache.commons.compress.utils.Lists; +import org.apache.doris.flink.cfg.DorisConnectionOptions; +import org.apache.doris.flink.connection.JdbcConnectionProvider; +import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; +import org.apache.doris.flink.exception.CreateTableException; +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.flink.annotation.Public; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Doris System Operate + */ +@Public +public class DorisSystem { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); + private JdbcConnectionProvider jdbcConnectionProvider; + private static final List<String> builtinDatabases = Arrays.asList("information_schema"); + + public DorisSystem(DorisConnectionOptions options) { + this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options); + } + + public List<String> listDatabases() throws Exception { + return extractColumnValuesBySQL( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + public boolean databaseExists(String database) throws Exception { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(database)); + return listDatabases().contains(database); + } + + public boolean createDatabase(String database) throws Exception { + execute(String.format("CREATE DATABASE %s", database)); + return true; + } + + public boolean tableExists(String database, String table){ + try { + return databaseExists(database) + && listTables(database).contains(table); + } catch (Exception e) { + return false; + } + } + + public List<String> listTables(String databaseName) throws Exception { + if (!databaseExists(databaseName)) { + throw new DorisRuntimeException("database" + databaseName + " is not exists"); + } + return extractColumnValuesBySQL( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + 1, + null, + databaseName); + } + + public void createTable(TableSchema schema) throws Exception { + String ddl = buildCreateTableDDL(schema); + LOG.info("Create table with ddl:{}", ddl); + execute(ddl); + } + + public void execute(String sql) throws Exception { + Connection conn = jdbcConnectionProvider.getOrEstablishConnection(); + try (Statement statement = conn.createStatement()) { + statement.execute(sql); + } + } + + private List<String> extractColumnValuesBySQL( + String sql, + int columnIndex, + Predicate<String> filterFunc, + Object... params) throws Exception { + + Connection conn = jdbcConnectionProvider.getOrEstablishConnection(); + List<String> columnValues = Lists.newArrayList(); + + try (PreparedStatement ps = conn.prepareStatement(sql)) { + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); + } + } + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String columnValue = rs.getString(columnIndex); + if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new CatalogException( + String.format( + "The following SQL query could not be executed: %s", sql), + e); + } + } + + public String buildCreateTableDDL(TableSchema schema) { + StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); + sb.append(identifier(schema.getDatabase())) + .append(".") + .append(identifier(schema.getTable())) + .append("("); + + Map<String, FieldSchema> fields = schema.getFields(); + List<String> keys = schema.getKeys(); + //append keys + for(String key : keys){ + if(!fields.containsKey(key)){ + throw new CreateTableException("key " + key + " not found in column list"); + } + FieldSchema field = fields.get(key); + buildColumn(sb, field); + } + + //append values + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + if(keys.contains(entry.getKey())){ + continue; + } + FieldSchema field = entry.getValue(); + buildColumn(sb, field); + + } + sb = sb.deleteCharAt(sb.length() -1); + sb.append(" ) "); + //append model + sb.append(schema.getModel().name()) + .append(" KEY(") + .append(String.join(",", identifier(schema.getKeys()))) + .append(")"); + + //append table comment + if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){ + sb.append(" COMMENT '") + .append(schema.getTableComment()) + .append("' "); + } + + //append distribute key + sb.append(" DISTRIBUTED BY HASH(") + .append(String.join(",", identifier(schema.getDistributeKeys()))) + .append(") BUCKETS AUTO "); + + //append properties + int index = 0; + for (Map.Entry<String, String> entry : schema.getProperties().entrySet()) { + if (index == 0) { + sb.append(" PROPERTIES ("); + } + if (index > 0) { + sb.append(","); + } + sb.append(quoteProperties(entry.getKey())) + .append("=") + .append(quoteProperties(entry.getValue())); + index++; + + if (index == schema.getProperties().size()) { + sb.append(")"); + } + } + return sb.toString(); + } + + private void buildColumn(StringBuilder sql, FieldSchema field){ + sql.append(identifier(field.getName())) + .append(" ") + .append(field.getTypeString()) + .append(" COMMENT '") + .append(field.getComment()) + .append("',"); + } + + private List<String> identifier(List<String> name) { + List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); + return result; + } + + private String identifier(String name) { + return "`" + name + "`"; + } + + private String quoteProperties(String name) { + return "'" + name + "'"; + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java new file mode 100644 index 0000000..1e43ac1 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog.doris; + +public class DorisType { + public static final String BOOLEAN = "BOOLEAN"; + public static final String TINYINT = "TINYINT"; + public static final String SMALLINT = "SMALLINT"; + public static final String INT = "INT"; + public static final String BIGINT = "BIGINT"; + public static final String LARGEINT = "LARGEINT"; + public static final String FLOAT = "FLOAT"; + public static final String DOUBLE = "DOUBLE"; + public static final String DECIMAL = "DECIMAL"; + public static final String DECIMAL_V3 = "DECIMALV3"; + public static final String DATE = "DATE"; + public static final String DATE_V2 = "DATEV2"; + public static final String DATETIME = "DATETIME"; + public static final String DATETIME_V2 = "DATETIMEV2"; + public static final String CHAR = "CHAR"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + public static final String HLL = "HLL"; + public static final String BITMAP = "BITMAP"; + public static final String ARRAY = "ARRAY"; + public static final String JSONB = "JSONB"; + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java new file mode 100644 index 0000000..8255bd3 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog.doris; + +public class FieldSchema { + private String name; + private String typeString; + private String comment; + + public FieldSchema() { + } + + public FieldSchema(String name, String typeString, String comment) { + this.name = name; + this.typeString = typeString; + this.comment = comment; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getTypeString() { + return typeString; + } + + public void setTypeString(String typeString) { + this.typeString = typeString; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java new file mode 100644 index 0000000..8f04705 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog.doris; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TableSchema { + private String database; + private String table; + private String tableComment; + private Map<String, FieldSchema> fields; + private List<String> keys = new ArrayList<>(); + private DataModel model = DataModel.DUPLICATE; + private List<String> distributeKeys = new ArrayList<>(); + private Map<String, String> properties = new HashMap<>(); + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public String getTableComment() { + return tableComment; + } + + public Map<String, FieldSchema> getFields() { + return fields; + } + + public List<String> getKeys() { + return keys; + } + + public DataModel getModel() { + return model; + } + + public List<String> getDistributeKeys() { + return distributeKeys; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setDatabase(String database) { + this.database = database; + } + + public void setTable(String table) { + this.table = table; + } + + public void setTableComment(String tableComment) { + this.tableComment = tableComment; + } + + public void setFields(Map<String, FieldSchema> fields) { + this.fields = fields; + } + + public void setKeys(List<String> keys) { + this.keys = keys; + } + + public void setModel(DataModel model) { + this.model = model; + } + + public void setDistributeKeys(List<String> distributeKeys) { + this.distributeKeys = distributeKeys; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 36c577a..722f6ef 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -122,7 +122,7 @@ public class DorisExecutionOptions implements Serializable { private int bufferCount = DEFAULT_BUFFER_COUNT; private String labelPrefix = ""; private Properties streamLoadProp = new Properties(); - private boolean enableDelete = false; + private boolean enableDelete = true; private boolean enable2PC = true; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java new file mode 100644 index 0000000..929346c --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.exception; + +/** + * Create Table exception. + */ +public class CreateTableException extends RuntimeException { + public CreateTableException() { + super(); + } + + public CreateTableException(String message) { + super(message); + } + + public CreateTableException(String message, Throwable cause) { + super(message, cause); + } + + public CreateTableException(Throwable cause) { + super(cause); + } + + protected CreateTableException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 711f765..642e1d3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -130,15 +130,16 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public void write(IN in, Context context) throws IOException { checkLoadException(); + byte[] serialize = serializer.serialize(in); + if(Objects.isNull(serialize)){ + //ddl record + return; + } if(!loading) { //Start streamload only when there has data dorisStreamLoad.startLoad(currentLabel); loading = true; } - byte[] serialize = serializer.serialize(in); - if(Objects.isNull(serialize)){ - return; - } dorisStreamLoad.writeRecord(serialize); } @@ -254,7 +255,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr backend = "http://" + backend; URL url = new URL(backend); HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(1000); + co.setConnectTimeout(60000); co.connect(); co.disconnect(); return true; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index a637967..3e129aa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -22,6 +22,8 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.table.factories.FactoryUtil; import java.time.Duration; +import java.util.Map; +import java.util.Properties; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; @@ -196,4 +198,16 @@ public class DorisConfigOptions { // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + public static Properties getStreamLoadProp(Map<String, String> tableOptions) { + final Properties streamLoadProp = new Properties(); + + for (Map.Entry<String, String> entry : tableOptions.entrySet()) { + if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) { + String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length()); + streamLoadProp.put(subKey, entry.getValue()); + } + } + return streamLoadProp; + } + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java new file mode 100644 index 0000000..809c4ea --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + + +import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.StringUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * cdc sync tools + */ +public class CdcTools { + private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database"; + private static final List<String> EMPTY_KEYS = Arrays.asList("password"); + + public static void main(String[] args) throws Exception { + String operation = args[0].toLowerCase(); + String[] opArgs = Arrays.copyOfRange(args, 1, args.length); + System.out.println(); + switch (operation) { + case MYSQL_SYNC_DATABASE: + createMySQLSyncDatabase(opArgs); + break; + default: + System.out.println("Unknown operation " + operation); + System.exit(1); + } + } + + private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + String jobName = params.get("job-name"); + String database = params.get("database"); + String tablePrefix = params.get("table-prefix"); + String tableSuffix = params.get("table-suffix"); + String includingTables = params.get("including-tables"); + String excludingTables = params.get("excluding-tables"); + + Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf"); + Map<String, String> sinkMap = getConfigMap(params, "sink-conf"); + Map<String, String> tableMap = getConfigMap(params, "table-conf"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration mysqlConfig = Configuration.fromMap(mysqlMap); + Configuration sinkConfig = Configuration.fromMap(sinkMap); + + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap); + databaseSync.build(); + + if(StringUtils.isNullOrWhitespaceOnly(jobName)){ + jobName = String.format("MySQL-Doris Sync Database: %s", mysqlMap.get("database-name")); + } + env.execute(jobName); + } + + private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) { + if (!params.has(key)) { + return null; + } + + Map<String, String> map = new HashMap<>(); + for (String param : params.getMultiParameter(key)) { + String[] kv = param.split("="); + if (kv.length == 2) { + map.put(kv[0], kv[1]); + continue; + }else if(kv.length == 1 && EMPTY_KEYS.contains(kv[0])){ + map.put(kv[0], ""); + continue; + } + + System.err.println( + "Invalid " + key + " " + param + ".\n"); + return null; + } + return map; + } +} \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java new file mode 100644 index 0000000..84d5b57 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.cfg.DorisConnectionOptions; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer; +import org.apache.doris.flink.table.DorisConfigOptions; +import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +public abstract class DatabaseSync { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); + private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; + protected Configuration config; + protected String database; + protected TableNameConverter converter; + protected Pattern includingPattern; + protected Pattern excludingPattern; + protected Map<String, String> tableConfig; + protected Configuration sinkConfig; + public StreamExecutionEnvironment env; + + public abstract Connection getConnection() throws SQLException; + + public abstract List<SourceSchema> getSchemaList() throws Exception; + + public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env); + + + public void create(StreamExecutionEnvironment env, String database, Configuration config, + String tablePrefix, String tableSuffix, String includingTables, + String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig) { + this.env = env; + this.config = config; + this.database = database; + this.converter = new TableNameConverter(tablePrefix, tableSuffix); + this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); + this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + this.sinkConfig = sinkConfig; + this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; + //default enable light schema change + if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){ + this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true"); + } + } + + public void build() throws Exception { + DorisConnectionOptions options = getDorisConnectionOptions(); + DorisSystem dorisSystem = new DorisSystem(options); + + List<SourceSchema> schemaList = getSchemaList(); + if (!dorisSystem.databaseExists(database)) { + LOG.info("database {} not exist, created", database); + dorisSystem.createDatabase(database); + } + + List<String> syncTables = new ArrayList<>(); + List<String> dorisTables = new ArrayList<>(); + for (SourceSchema schema : schemaList) { + syncTables.add(schema.getTableName()); + String dorisTable = converter.convert(schema.getTableName()); + if (!dorisSystem.tableExists(database, dorisTable)) { + TableSchema dorisSchema = schema.convertTableSchema(tableConfig); + //set doris target database + dorisSchema.setDatabase(database); + dorisSchema.setTable(dorisTable); + dorisSystem.createTable(dorisSchema); + } + dorisTables.add(dorisTable); + } + Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized."); + config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", syncTables) + ")"); + + DataStreamSource<String> streamSource = buildCdcSource(env); + SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter)); + for (String table : dorisTables) { + OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table); + DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag); + + int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); + sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table); + } + } + + private DorisConnectionOptions getDorisConnectionOptions() { + String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); + String user = sinkConfig.getString(DorisConfigOptions.USERNAME); + String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); + String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL); + Preconditions.checkNotNull(fenodes, "fenodes is empty in sink-conf"); + Preconditions.checkNotNull(user, "username is empty in sink-conf"); + Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf"); + DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new DorisConnectionOptions.DorisConnectionOptionsBuilder() + .withFenodes(fenodes) + .withUsername(user) + .withPassword(passwd) + .withJdbcUrl(jdbcUrl); + return builder.build(); + } + + /** + * create doris sink + */ + public DorisSink<String> buildDorisSink(String table) { + String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); + String user = sinkConfig.getString(DorisConfigOptions.USERNAME); + String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); + String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX); + + DorisSink.Builder<String> builder = DorisSink.builder(); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes(fenodes) + .setTableIdentifier(database + "." + table) + .setUsername(user) + .setPassword(passwd); + + Properties pro = new Properties(); + //default json data format + pro.setProperty("format", "json"); + pro.setProperty("read_json_by_line", "true"); + //customer stream load properties + Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap()); + pro.putAll(streamLoadProp); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder() + .setLabelPrefix(String.join("-", labelPrefix, database, table)) + .setStreamLoadProp(pro); + + sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable); + sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount); + sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize); + sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval); + sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries); + + boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC); + if(!enable2pc){ + executionBuilder.disable2PC(); + } + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build()) + .setDorisOptions(dorisBuilder.build()); + return builder.build(); + } + + /** + * Filter table that need to be synchronized + */ + protected boolean isSyncNeeded(String tableName) { + boolean sync = true; + if (includingPattern != null) { + sync = includingPattern.matcher(tableName).matches(); + } + if (excludingPattern != null) { + sync = sync && !excludingPattern.matcher(tableName).matches(); + } + LOG.debug("table {} is synchronized? {}", tableName, sync); + return sync; + } + + public static class TableNameConverter implements Serializable { + private static final long serialVersionUID = 1L; + private final String prefix; + private final String suffix; + + TableNameConverter(){ + this("",""); + } + + TableNameConverter(String prefix, String suffix) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + } + + public String convert(String tableName) { + return prefix + tableName + suffix; + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java similarity index 96% copy from flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java copy to flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java index 9d73f53..ed5b2b6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.utils; +package org.apache.doris.flink.tools.cdc; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; -import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,11 +47,11 @@ public class DateToStringConverter implements CustomConverter<SchemaBuilder, Rel static { DEFAULT_PROPS.setProperty("converters", "date"); - DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.utils.DateToStringConverter"); + DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.tools.cdc.DateToStringConverter"); DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd"); DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss"); - DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss"); - DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC"); + DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS"); + DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC+8"); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java new file mode 100644 index 0000000..03aa90e --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.tools.cdc.mysql.MysqlType; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class SourceSchema { + private final String databaseName; + private final String tableName; + private final String tableComment; + private final LinkedHashMap<String, FieldSchema> fields; + public final List<String> primaryKeys; + + public SourceSchema( + DatabaseMetaData metaData, String databaseName, String tableName, String tableComment) + throws Exception { + this.databaseName = databaseName; + this.tableName = tableName; + this.tableComment = tableComment; + + fields = new LinkedHashMap<>(); + try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + String comment = rs.getString("REMARKS"); + String fieldType = rs.getString("TYPE_NAME"); + Integer precision = rs.getInt("COLUMN_SIZE"); + + if (rs.wasNull()) { + precision = null; + } + Integer scale = rs.getInt("DECIMAL_DIGITS"); + if (rs.wasNull()) { + scale = null; + } + String dorisTypeStr = MysqlType.toDorisType(fieldType, precision, scale); + fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment)); + } + } + + primaryKeys = new ArrayList<>(); + try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + primaryKeys.add(fieldName); + } + } + } + + public TableSchema convertTableSchema(Map<String, String> tableProps) { + TableSchema tableSchema = new TableSchema(); + tableSchema.setModel(DataModel.UNIQUE); + tableSchema.setFields(this.fields); + tableSchema.setKeys(this.primaryKeys); + tableSchema.setTableComment(this.tableComment); + tableSchema.setDistributeKeys(this.primaryKeys); + tableSchema.setProperties(tableProps); + return tableSchema; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public LinkedHashMap<String, FieldSchema> getFields() { + return fields; + } + + public List<String> getPrimaryKeys() { + return primaryKeys; + } + + public String getTableComment() { + return tableComment; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java new file mode 100644 index 0000000..caa975d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.mysql; + +import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.DebeziumOptions; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.doris.flink.tools.cdc.DateToStringConverter; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class MysqlDatabaseSync extends DatabaseSync { + private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); + + public MysqlDatabaseSync() { + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection( + String.format( + "jdbc:mysql://%s:%d?useInformationSchema=true", + config.get(MySqlSourceOptions.HOSTNAME), + config.get(MySqlSourceOptions.PORT)), + config.get(MySqlSourceOptions.USERNAME), + config.get(MySqlSourceOptions.PASSWORD)); + } + + @Override + public List<SourceSchema> getSchemaList() throws Exception { + String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + List<SourceSchema> schemaList = new ArrayList<>(); + try (Connection conn = getConnection()) { + DatabaseMetaData metaData = conn.getMetaData(); + try (ResultSet tables = + metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String tableComment = tables.getString("REMARKS"); + if (!isSyncNeeded(tableName)) { + continue; + } + SourceSchema sourceSchema = + new SourceSchema(metaData, databaseName, tableName, tableComment); + if (sourceSchema.primaryKeys.size() > 0) { + //Only sync tables with primary keys + schemaList.add(sourceSchema); + } else { + LOG.warn("table {} has no primary key, skip", tableName); + System.out.println("table " + tableName + " has no primary key, skip."); + } + } + } + } + return schemaList; + } + + @Override + public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { + MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder(); + + String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + Preconditions.checkNotNull(databaseName, "database-name in mysql is required"); + String tableName = config.get(MySqlSourceOptions.TABLE_NAME); + sourceBuilder + .hostname(config.get(MySqlSourceOptions.HOSTNAME)) + .port(config.get(MySqlSourceOptions.PORT)) + .username(config.get(MySqlSourceOptions.USERNAME)) + .password(config.get(MySqlSourceOptions.PASSWORD)) + .databaseList(databaseName) + .tableList(databaseName + "." + tableName); + + config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); + config + .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE) + .ifPresent(sourceBuilder::serverTimeZone); + config + .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE) + .ifPresent(sourceBuilder::fetchSize); + config + .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT) + .ifPresent(sourceBuilder::connectTimeout); + config + .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES) + .ifPresent(sourceBuilder::connectMaxRetries); + config + .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE) + .ifPresent(sourceBuilder::connectionPoolSize); + config + .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL) + .ifPresent(sourceBuilder::heartbeatInterval); + config + .getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED) + .ifPresent(sourceBuilder::scanNewlyAddedTableEnabled); + + String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE); + if ("initial".equalsIgnoreCase(startupMode)) { + sourceBuilder.startupOptions(StartupOptions.initial()); + } else if ("earliest-offset".equalsIgnoreCase(startupMode)) { + sourceBuilder.startupOptions(StartupOptions.earliest()); + } else if ("latest-offset".equalsIgnoreCase(startupMode)) { + sourceBuilder.startupOptions(StartupOptions.latest()); + } else if ("specific-offset".equalsIgnoreCase(startupMode)) { + BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder(); + String file = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Long pos = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS); + if (file != null && pos != null) { + offsetBuilder.setBinlogFilePosition(file, pos); + } + config + .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET) + .ifPresent(offsetBuilder::setGtidSet); + config + .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS) + .ifPresent(offsetBuilder::setSkipEvents); + config + .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS) + .ifPresent(offsetBuilder::setSkipRows); + sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build())); + } else if ("timestamp".equalsIgnoreCase(startupMode)) { + sourceBuilder.startupOptions( + StartupOptions.timestamp( + config.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS))); + } + + Properties jdbcProperties = new Properties(); + Properties debeziumProperties = new Properties(); + //date to string + debeziumProperties.putAll(DateToStringConverter.DEFAULT_PROPS); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) { + jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value); + } else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) { + debeziumProperties.put( + key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value); + } + } + sourceBuilder.jdbcProperties(jdbcProperties); + sourceBuilder.debeziumProperties(debeziumProperties); + + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + JsonDebeziumDeserializationSchema schema = + new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + MySqlSource<String> mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); + + DataStreamSource<String> streamSource = env.fromSource( + mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); + return streamSource; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java new file mode 100644 index 0000000..92325ac --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.mysql; + +import org.apache.doris.flink.catalog.doris.DorisType; +import org.apache.flink.util.Preconditions; + +public class MysqlType { + private static final String BIT = "BIT"; + private static final String BOOLEAN = "BOOLEAN"; + private static final String BOOL = "BOOL"; + private static final String TINYINT = "TINYINT"; + private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; + private static final String SMALLINT = "SMALLINT"; + private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL"; + private static final String MEDIUMINT = "MEDIUMINT"; + private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL"; + private static final String INT = "INT"; + private static final String INT_UNSIGNED = "INT UNSIGNED"; + private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; + private static final String BIGINT = "BIGINT"; + private static final String SERIAL = "SERIAL"; + private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL"; + private static final String REAL = "REAL"; + private static final String REAL_UNSIGNED = "REAL UNSIGNED"; + private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL"; + private static final String FLOAT = "FLOAT"; + private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL"; + private static final String DOUBLE = "DOUBLE"; + private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL"; + private static final String DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED"; + private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = + "DOUBLE PRECISION UNSIGNED ZEROFILL"; + private static final String NUMERIC = "NUMERIC"; + private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED"; + private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL"; + private static final String FIXED = "FIXED"; + private static final String FIXED_UNSIGNED = "FIXED UNSIGNED"; + private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL"; + private static final String DECIMAL = "DECIMAL"; + private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL"; + private static final String CHAR = "CHAR"; + private static final String VARCHAR = "VARCHAR"; + private static final String TINYTEXT = "TINYTEXT"; + private static final String MEDIUMTEXT = "MEDIUMTEXT"; + private static final String TEXT = "TEXT"; + private static final String LONGTEXT = "LONGTEXT"; + private static final String DATE = "DATE"; + private static final String TIME = "TIME"; + private static final String DATETIME = "DATETIME"; + private static final String TIMESTAMP = "TIMESTAMP"; + private static final String YEAR = "YEAR"; + private static final String BINARY = "BINARY"; + private static final String VARBINARY = "VARBINARY"; + private static final String TINYBLOB = "TINYBLOB"; + private static final String MEDIUMBLOB = "MEDIUMBLOB"; + private static final String BLOB = "BLOB"; + private static final String LONGBLOB = "LONGBLOB"; + private static final String JSON = "JSON"; + private static final String ENUM = "ENUM"; + + public static String toDorisType(String type, Integer length, Integer scale) { + switch (type.toUpperCase()) { + case BIT: + case BOOLEAN: + case BOOL: + return DorisType.BOOLEAN; + case TINYINT: + return DorisType.TINYINT; + case TINYINT_UNSIGNED: + case TINYINT_UNSIGNED_ZEROFILL: + case SMALLINT: + return DorisType.SMALLINT; + case SMALLINT_UNSIGNED: + case SMALLINT_UNSIGNED_ZEROFILL: + case INT: + case MEDIUMINT: + case YEAR: + return DorisType.INT; + case INT_UNSIGNED: + case INT_UNSIGNED_ZEROFILL: + case MEDIUMINT_UNSIGNED: + case MEDIUMINT_UNSIGNED_ZEROFILL: + case BIGINT: + return DorisType.BIGINT; + case BIGINT_UNSIGNED: + case BIGINT_UNSIGNED_ZEROFILL: + return DorisType.LARGEINT; + case FLOAT: + case FLOAT_UNSIGNED: + case FLOAT_UNSIGNED_ZEROFILL: + return DorisType.FLOAT; + case REAL: + case REAL_UNSIGNED: + case REAL_UNSIGNED_ZEROFILL: + case DOUBLE: + case DOUBLE_UNSIGNED: + case DOUBLE_UNSIGNED_ZEROFILL: + case DOUBLE_PRECISION: + case DOUBLE_PRECISION_UNSIGNED: + case DOUBLE_PRECISION_UNSIGNED_ZEROFILL: + return DorisType.DOUBLE; + case NUMERIC: + case NUMERIC_UNSIGNED: + case NUMERIC_UNSIGNED_ZEROFILL: + case FIXED: + case FIXED_UNSIGNED: + case FIXED_UNSIGNED_ZEROFILL: + case DECIMAL: + case DECIMAL_UNSIGNED: + case DECIMAL_UNSIGNED_ZEROFILL: + return length != null && length <= 38 + ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, length, scale != null && scale >= 0 ? scale : 0) + : DorisType.STRING; + case DATE: + return DorisType.DATE_V2; + case DATETIME: + case TIMESTAMP: + return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length == null ? 0 : length, 6)); + case CHAR: + Preconditions.checkNotNull(length); + return String.format("%s(%s)", DorisType.CHAR, length); + case VARCHAR: + Preconditions.checkNotNull(length); + return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3); + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case TIME: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + case BINARY: + case VARBINARY: + return DorisType.STRING; + case JSON: + return DorisType.JSONB; + default: + throw new UnsupportedOperationException("Unsupported MySQL Type: " + type); + } + + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java new file mode 100644 index 0000000..563c848 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.HashMap; +import java.util.Map; + +public class ParsingProcessFunction extends ProcessFunction<String, Void> { + private ObjectMapper objectMapper = new ObjectMapper(); + private transient Map<String, OutputTag<String>> recordOutputTags; + private DatabaseSync.TableNameConverter converter; + + public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) { + this.converter = converter; + } + + @Override + public void open(Configuration parameters) throws Exception { + recordOutputTags = new HashMap<>(); + } + + @Override + public void processElement(String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector) throws Exception { + JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); + String tableName = extractJsonNode(recordRoot.get("source"), "table"); + String dorisName = converter.convert(tableName); + context.output(getRecordOutputTag(dorisName), record); + } + + private String extractJsonNode(JsonNode record, String key) { + return record != null && record.get(key) != null ? record.get(key).asText() : null; + } + + private OutputTag<String> getRecordOutputTag(String tableName) { + return recordOutputTags.computeIfAbsent( + tableName, ParsingProcessFunction::createRecordOutputTag); + } + + public static OutputTag<String> createRecordOutputTag(String tableName) { + return new OutputTag<String>("record-" + tableName) { + }; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java index bdc0584..3bad9be 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java @@ -18,16 +18,17 @@ package org.apache.doris.flink; import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; -import org.apache.doris.flink.utils.DateToStringConverter; import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer; +import org.apache.doris.flink.utils.DateToStringConverter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.kafka.connect.json.JsonConverterConfig; import java.util.HashMap; import java.util.Map; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java new file mode 100644 index 0000000..cbd3b38 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class CdcMysqlSyncDatabaseCase { + + public static void main(String[] args) throws Exception{ + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// env.setParallelism(1); + + Map<String,String> flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval","10s"); + flinkMap.put("pipeline.operator-chaining","false"); + flinkMap.put("parallelism.default","1"); + + + Configuration configuration = Configuration.fromMap(flinkMap); + env.configure(configuration); + + String database = "db1"; + String tablePrefix = ""; + String tableSuffix = ""; + Map<String,String> mysqlConfig = new HashMap<>(); + mysqlConfig.put("database-name","db1"); + mysqlConfig.put("hostname","127.0.0.1"); + mysqlConfig.put("port","3306"); + mysqlConfig.put("username","root"); + mysqlConfig.put("password",""); + Configuration config = Configuration.fromMap(mysqlConfig); + + Map<String,String> sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes","127.0.0.1:8030"); + sinkConfig.put("username","root"); + sinkConfig.put("password",""); + sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map<String,String> tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + + String includingTables = "tbl1|tbl2|tbl3"; + String excludingTables = ""; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig); + databaseSync.build(); + env.execute(String.format("MySQL-Doris Database Sync: %s", database)); + + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java index 9d73f53..1f1fe61 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java @@ -17,9 +17,9 @@ package org.apache.doris.flink.utils; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; -import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org