This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 00b8821 [Feature] support doris catalog (#60) 00b8821 is described below commit 00b882190e7d9492c6c77c8ce8474ba618be0ed6 Author: wudi <676366...@qq.com> AuthorDate: Tue Sep 6 16:35:04 2022 +0800 [Feature] support doris catalog (#60) * add doris catalog and fix thrift Concurrency bug --- .../apache/doris/flink/catalog/DorisCatalog.java | 526 +++++++++++++++++++++ .../doris/flink/catalog/DorisCatalogFactory.java | 124 +++++ .../doris/flink/catalog/DorisCatalogOptions.java | 26 + .../doris/flink/catalog/DorisTypeMapper.java | 89 ++++ .../source/reader/DorisSourceSplitReader.java | 8 - .../flink/source/reader/DorisValueReader.java | 94 ++-- .../doris/flink/table/DorisConfigOptions.java | 145 ++++++ .../flink/table/DorisDynamicTableFactory.java | 152 ++---- .../org.apache.flink.table.factories.Factory | 3 +- .../apache/doris/flink/catalog/CatalogTest.java | 263 +++++++++++ .../flink/source/reader/DorisSourceReaderTest.java | 2 + 11 files changed, 1263 insertions(+), 169 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java new file mode 100644 index 0000000..00f7fb3 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -0,0 +1,526 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog; + +import org.apache.commons.compress.utils.Lists; +import org.apache.doris.flink.table.DorisDynamicTableFactory; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.StringJoiner; +import java.util.function.Predicate; + +import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE; +import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL; +import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; +import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; +import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * ClickHouse catalog. + */ +public class DorisCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class); + + private static final Set<String> builtinDatabases = + new HashSet<String>() { + { + add("information_schema"); + } + }; + + private final String username; + private final String password; + private final String jdbcUrl; + private final Map<String, String> properties; + + public DorisCatalog( + String catalogName, + String jdbcUrl, + String defaultDatabase, + String username, + String password, + Map<String, String> properties) { + super(catalogName, defaultDatabase); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(jdbcUrl), "jdbc-url cannot be null or empty"); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty"); + + this.jdbcUrl = jdbcUrl.endsWith("/") ? jdbcUrl : jdbcUrl + "/"; + ; + this.username = username; + this.password = password; + this.properties = Collections.unmodifiableMap(properties); + } + + @Override + public void open() throws CatalogException { + // test connection, fail early if we cannot connect to database + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed connecting to %s via JDBC.", jdbcUrl), e); + } + + LOG.info("Catalog {} established connection to {}", getName(), jdbcUrl); + } + + @Override + public synchronized void close() throws CatalogException { + try { + LOG.info("Closed catalog {} ", getName()); + } catch (Exception e) { + throw new CatalogException(String.format("Closing catalog %s failed.", getName()), e); + } + } + + @Override + public Optional<Factory> getFactory() { + return Optional.of(new DorisDynamicTableFactory()); + } + + // ------------- databases ------------- + + @Override + public List<String> listDatabases() throws CatalogException { + return extractColumnValuesBySQL( + jdbcUrl, + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + return listDatabases().contains(databaseName); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------------- tables ------------- + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkState( + org.apache.commons.lang3.StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return extractColumnValuesBySQL( + jdbcUrl + databaseName, + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + 1, + null, + databaseName); + } + + @Override + public List<String> listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + String databaseName = tablePath.getDatabaseName(); + String tableName = tablePath.getObjectName(); + Map<String, String> props = new HashMap<>(properties); + props.put(CONNECTOR.key(), IDENTIFIER); + if (!props.containsKey(FENODES.key())) { + props.put(FENODES.key(), queryFenodes()); + } + props.put(USERNAME.key(), username); + props.put(PASSWORD.key(), password); + props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName); + + String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),""); + props.put(SINK_LABEL_PREFIX.key(), String.join("_",labelPrefix,databaseName,tableName)); + //remove catalog option + props.remove(JDBCURL.key()); + props.remove(DEFAULT_DATABASE.key()); + return CatalogTable.of(createTableSchema(databaseName, tableName), null, Lists.newArrayList(), props); + + } + + @VisibleForTesting + protected String queryFenodes() { + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { + StringJoiner fenodes = new StringJoiner(","); + PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS"); + ResultSet resultSet = ps.executeQuery(); + while (resultSet.next()) { + String ip = resultSet.getString("IP"); + String port = resultSet.getString("HttpPort"); + fenodes.add(ip + ":" + port); + } + return fenodes.toString(); + } catch (Exception e) { + throw new CatalogException("Failed getting fenodes", e); + } + } + + private Schema createTableSchema(String databaseName, String tableName) { + String dbUrl = jdbcUrl + databaseName; + try (Connection conn = DriverManager.getConnection(dbUrl, username, password)) { + PreparedStatement ps = + conn.prepareStatement( + String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName)); + + List<String> columnNames = new ArrayList<>(); + List<DataType> columnTypes = new ArrayList<>(); + ResultSet resultSet = ps.executeQuery(); + while (resultSet.next()) { + String columnName = resultSet.getString("COLUMN_NAME"); + String columnType = resultSet.getString("DATA_TYPE"); + long columnSize = resultSet.getLong("COLUMN_SIZE"); + long columnDigit = resultSet.getLong("DECIMAL_DIGITS"); + DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit); + columnNames.add(columnName); + columnTypes.add(flinkType); + } + Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, columnTypes); + Schema tableSchema = schemaBuilder.build(); + return tableSchema; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting catalog %s database %s table %s", getName(), databaseName, tableName), e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + try { + return databaseExists(tablePath.getDatabaseName()) + && listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName()); + } catch (DatabaseNotExistException e) { + return false; + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------------- partitions ------------- + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter( + ObjectPath tablePath, List<Expression> filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------------- functions ------------- + + @Override + public List<String> listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------------- statistics ------------- + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + + private List<String> extractColumnValuesBySQL( + String connUrl, + String sql, + int columnIndex, + Predicate<String> filterFunc, + Object... params) { + + List<String> columnValues = Lists.newArrayList(); + + try (Connection conn = DriverManager.getConnection(connUrl, username, password); + PreparedStatement ps = conn.prepareStatement(sql)) { + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); + } + } + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String columnValue = rs.getString(columnIndex); + if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new CatalogException( + String.format( + "The following SQL query could not be executed (%s): %s", connUrl, sql), + e); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java new file mode 100644 index 0000000..f23f0dc --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE; +import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; +import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; +import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; +import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; + +/** + * Factory for {@link DorisCatalog}. + */ +public class DorisCatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(JDBCURL); + options.add(USERNAME); + options.add(PASSWORD); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(JDBCURL); + options.add(DEFAULT_DATABASE); + + options.add(FENODES); + options.add(TABLE_IDENTIFIER); + options.add(USERNAME); + options.add(PASSWORD); + + options.add(DORIS_READ_FIELD); + options.add(DORIS_FILTER_QUERY); + options.add(DORIS_TABLET_SIZE); + options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS); + options.add(DORIS_REQUEST_READ_TIMEOUT_MS); + options.add(DORIS_REQUEST_QUERY_TIMEOUT_S); + options.add(DORIS_REQUEST_RETRIES); + options.add(DORIS_DESERIALIZE_ARROW_ASYNC); + options.add(DORIS_DESERIALIZE_QUEUE_SIZE); + options.add(DORIS_BATCH_SIZE); + options.add(DORIS_EXEC_MEM_LIMIT); + + options.add(SINK_CHECK_INTERVAL); + options.add(SINK_ENABLE_2PC); + options.add(SINK_MAX_RETRIES); + options.add(SINK_ENABLE_DELETE); + options.add(SINK_LABEL_PREFIX); + options.add(SINK_BUFFER_SIZE); + options.add(SINK_BUFFER_COUNT); + + options.add(SOURCE_USE_OLD_API); + return options; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validateExcept(STREAM_LOAD_PROP_PREFIX); + + return new DorisCatalog( + context.getName(), + helper.getOptions().get(JDBCURL), + helper.getOptions().get(DEFAULT_DATABASE), + helper.getOptions().get(USERNAME), + helper.getOptions().get(PASSWORD), + ((Configuration) helper.getOptions()).toMap()); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java new file mode 100644 index 0000000..ff87f9b --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +public class DorisCatalogOptions { + public static final ConfigOption<String> JDBCURL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url."); + public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue(); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java new file mode 100644 index 0000000..f7a16ce --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +public class DorisTypeMapper { + + // -------------------------number---------------------------- + private static final String DORIS_TINYINT = "TINYINT"; + private static final String DORIS_SMALLINT = "SMALLINT"; + private static final String DORIS_INT = "INT"; + private static final String DORIS_BIGINT = "BIGINT"; + private static final String DORIS_LARGEINT = "BIGINT UNSIGNED"; + private static final String DORIS_DECIMAL = "DECIMAL"; + private static final String DORIS_FLOAT = "FLOAT"; + private static final String DORIS_DOUBLE = "DOUBLE"; + + // -------------------------string---------------------------- + private static final String DORIS_CHAR = "CHAR"; + private static final String DORIS_VARCHAR = "VARCHAR"; + private static final String DORIS_STRING = "STRING"; + private static final String DORIS_TEXT = "TEXT"; + + // ------------------------------time------------------------- + private static final String DORIS_DATE = "DATE"; + private static final String DORIS_DATETIME = "DATETIME"; + + //------------------------------bool------------------------ + private static final String DORIS_BOOLEAN = "BOOLEAN"; + + + public static DataType toFlinkType(String columnName, String columnType, int precision, int scale) { + columnType = columnType.toUpperCase(); + switch (columnType) { + case DORIS_BOOLEAN: + return DataTypes.BOOLEAN(); + case DORIS_TINYINT: + if (precision == 0) { + //The boolean type will become tinyint when queried in information_schema, and precision=0 + return DataTypes.BOOLEAN(); + } else { + return DataTypes.TINYINT(); + } + case DORIS_SMALLINT: + return DataTypes.SMALLINT(); + case DORIS_INT: + return DataTypes.INT(); + case DORIS_BIGINT: + return DataTypes.BIGINT(); + case DORIS_DECIMAL: + return DataTypes.DECIMAL(precision, scale); + case DORIS_FLOAT: + return DataTypes.FLOAT(); + case DORIS_DOUBLE: + return DataTypes.DOUBLE(); + case DORIS_CHAR: + return DataTypes.CHAR(precision); + case DORIS_LARGEINT: + case DORIS_VARCHAR: + case DORIS_STRING: + case DORIS_TEXT: + return DataTypes.STRING(); + case DORIS_DATE: + return DataTypes.DATE(); + case DORIS_DATETIME: + return DataTypes.TIMESTAMP(0); + default: + throw new UnsupportedOperationException( + String.format( + "Doesn't support Doris type '%s' on column '%s'", columnType, columnName)); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index c5d33f7..b12bf2a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -74,14 +74,6 @@ public class DorisSourceSplitReader } private DorisSplitRecords finishSplit() { - if (valueReader != null) { - try { - valueReader.close(); - } catch (Exception e) { - LOG.error("close resource reader failed,", e); - } - valueReader = null; - } final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; return finishRecords; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index 173ea90..04474f0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER; @@ -53,6 +55,8 @@ import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAG public class DorisValueReader implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class); protected BackendClient client; + protected Lock clientLock = new ReentrantLock(); + private PartitionDefinition partition; private DorisOptions options; private DorisReadOptions readOptions; @@ -85,10 +89,15 @@ public class DorisValueReader implements AutoCloseable { } private void init() { - this.openParams = openParams(); - TScanOpenResult openResult = this.client.openScanner(this.openParams); - this.contextId = openResult.getContextId(); - this.schema = SchemaUtils.convertToSchema(openResult.getSelectedColumns()); + clientLock.lock(); + try { + this.openParams = openParams(); + TScanOpenResult openResult = this.client.openScanner(this.openParams); + this.contextId = openResult.getContextId(); + this.schema = SchemaUtils.convertToSchema(openResult.getSelectedColumns()); + } finally { + clientLock.unlock(); + } this.asyncThreadStarted = asyncThreadStarted(); LOG.debug("Open scan result is, contextId: {}, schema: {}.", contextId, schema); } @@ -127,22 +136,27 @@ public class DorisValueReader implements AutoCloseable { protected Thread asyncThread = new Thread(new Runnable() { @Override public void run() { - TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); - nextBatchParams.setContextId(contextId); - while (!eos.get()) { - nextBatchParams.setOffset(offset); - TScanBatchResult nextResult = client.getNext(nextBatchParams); - eos.set(nextResult.isEos()); - if (!eos.get()) { - RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow(); - offset += rowBatch.getReadRowCount(); - rowBatch.close(); - try { - rowBatchBlockingQueue.put(rowBatch); - } catch (InterruptedException e) { - throw new DorisRuntimeException(e); + clientLock.lock(); + try{ + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + while (!eos.get()) { + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow(); + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + try { + rowBatchBlockingQueue.put(rowBatch); + } catch (InterruptedException e) { + throw new DorisRuntimeException(e); + } } } + } finally { + clientLock.unlock(); } } }); @@ -187,22 +201,27 @@ public class DorisValueReader implements AutoCloseable { hasNext = true; } } else { - // Arrow data was acquired synchronously during the iterative process - if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { - if (rowBatch != null) { - offset += rowBatch.getReadRowCount(); - rowBatch.close(); - } - TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); - nextBatchParams.setContextId(contextId); - nextBatchParams.setOffset(offset); - TScanBatchResult nextResult = client.getNext(nextBatchParams); - eos.set(nextResult.isEos()); - if (!eos.get()) { - rowBatch = new RowBatch(nextResult, schema).readArrow(); + clientLock.lock(); + try{ + // Arrow data was acquired synchronously during the iterative process + if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { + if (rowBatch != null) { + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + } + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + rowBatch = new RowBatch(nextResult, schema).readArrow(); + } } + hasNext = !eos.get(); + } finally { + clientLock.unlock(); } - hasNext = !eos.get(); } return hasNext; } @@ -222,8 +241,13 @@ public class DorisValueReader implements AutoCloseable { @Override public void close() throws Exception { - TScanCloseParams closeParams = new TScanCloseParams(); - closeParams.setContextId(contextId); - client.closeScanner(closeParams); + clientLock.lock(); + try { + TScanCloseParams closeParams = new TScanCloseParams(); + closeParams.setContextId(contextId); + client.closeScanner(closeParams); + } finally { + clientLock.unlock(); + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java new file mode 100644 index 0000000..5b56342 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; + +public class DorisConfigOptions { + + public static final String IDENTIFIER = "doris"; + // common option + public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); + public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); + + // source config options + public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions + .key("doris.read.field") + .stringType() + .noDefaultValue() + .withDescription("List of column names in the Doris table, separated by commas"); + public static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions + .key("doris.filter.query") + .stringType() + .noDefaultValue() + .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); + public static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions + .key("doris.request.tablet.size") + .intType() + .defaultValue(DORIS_TABLET_SIZE_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions + .key("doris.request.connect.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions + .key("doris.request.read.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions + .key("doris.request.query.timeout.s") + .intType() + .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions + .key("doris.request.retries") + .intType() + .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) + .withDescription(""); + public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions + .key("doris.deserialize.arrow.async") + .booleanType() + .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions + .key("doris.request.retriesdoris.deserialize.queue.size") + .intType() + .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .withDescription(""); + public static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions + .key("doris.batch.size") + .intType() + .defaultValue(DORIS_BATCH_SIZE_DEFAULT) + .withDescription(""); + public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions + .key("doris.exec.mem.limit") + .longType() + .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) + .withDescription(""); + public static final ConfigOption<Boolean> SOURCE_USE_OLD_API = ConfigOptions + .key("source.use-old-api") + .booleanType() + .defaultValue(false) + .withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false"); + + // sink config options + public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions + .key("sink.enable-2pc") + .booleanType() + .defaultValue(true) + .withDescription("enable 2PC while loading"); + + public static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions + .key("sink.check-interval") + .intType() + .defaultValue(10000) + .withDescription("check exception with the interval while loading"); + public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions + .key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); + public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions + .key("sink.buffer-size") + .intType() + .defaultValue(256 * 1024) + .withDescription("the buffer size to cache data for stream load."); + public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions + .key("sink.buffer-count") + .intType() + .defaultValue(3) + .withDescription("the buffer count to cache data for stream load."); + public static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions + .key("sink.label-prefix") + .stringType() + .defaultValue("") + .withDescription("the unique label prefix."); + public static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions + .key("sink.enable-delete") + .booleanType() + .defaultValue(true) + .withDescription("whether to enable the delete function"); + + // Prefix for Doris StreamLoad specific properties. + public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index fb44359..81ee23c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -20,7 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -31,21 +30,37 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; -import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; +import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; +import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; +import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; + /** * The {@link DorisDynamicTableFactory} translates the catalog table to a table source. @@ -55,121 +70,9 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_ */ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { - public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); - public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); - public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); - public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); - // Prefix for Doris StreamLoad specific properties. - public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; - // doris options - private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions - .key("doris.read.field") - .stringType() - .noDefaultValue() - .withDescription("List of column names in the Doris table, separated by commas"); - private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions - .key("doris.filter.query") - .stringType() - .noDefaultValue() - .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); - private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions - .key("doris.request.tablet.size") - .intType() - .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions - .key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions - .key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions - .key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions - .key("doris.request.retries") - .intType() - .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); - private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions - .key("doris.deserialize.arrow.async") - .booleanType() - .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") - .intType() - .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); - private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions - .key("doris.batch.size") - .intType() - .defaultValue(DORIS_BATCH_SIZE_DEFAULT) - .withDescription(""); - private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions - .key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); - // flink write config options - private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions - .key("sink.enable-2pc") - .booleanType() - .defaultValue(true) - .withDescription("enable 2PC while loading"); - - private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions - .key("sink.check-interval") - .intType() - .defaultValue(10000) - .withDescription("check exception with the interval while loading"); - private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions - .key("sink.max-retries") - .intType() - .defaultValue(3) - .withDescription("the max retry times if writing records to database failed."); - private static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions - .key("sink.buffer-size") - .intType() - .defaultValue(256 * 1024) - .withDescription("the buffer size to cache data for stream load."); - private static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions - .key("sink.buffer-count") - .intType() - .defaultValue(3) - .withDescription("the buffer count to cache data for stream load."); - private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions - .key("sink.label-prefix") - .stringType() - .defaultValue("") - .withDescription("the unique label prefix."); - private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions - .key("sink.batch.interval") - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + - "default value is 1s."); - private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions - .key("sink.enable-delete") - .booleanType() - .defaultValue(true) - .withDescription("whether to enable the delete function"); - - private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = ConfigOptions - .key("source.use-old-api") - .booleanType() - .defaultValue(false) - .withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false"); - @Override public String factoryIdentifier() { - return "doris"; // used for matching to `connector = '...'` + return IDENTIFIER; // used for matching to `connector = '...'` } @Override @@ -203,7 +106,6 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_CHECK_INTERVAL); options.add(SINK_ENABLE_2PC); options.add(SINK_MAX_RETRIES); - options.add(SINK_BUFFER_FLUSH_INTERVAL); options.add(SINK_ENABLE_DELETE); options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); diff --git a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index e625cc7..5863c8b 100644 --- a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -15,4 +15,5 @@ # specific language governing permissions and limitations # under the License. -org.apache.doris.flink.table.DorisDynamicTableFactory \ No newline at end of file +org.apache.doris.flink.table.DorisDynamicTableFactory +org.apache.doris.flink.catalog.DorisCatalogFactory \ No newline at end of file diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java new file mode 100644 index 0000000..e1452d3 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.catalog; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Class for unit tests to run on catalogs. */ +@Ignore +public class CatalogTest { + private static final String TEST_CATALOG_NAME = "doris_catalog"; + private static final String TEST_FENODES = "127.0.0.1:8030"; + private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1.78:9030"; + private static final String TEST_USERNAME = "root"; + private static final String TEST_PWD = ""; + private static final String TEST_DB = "test"; + private static final String TEST_TABLE = "t_all_types"; + private static final String TEST_TABLE_SINK = "t_all_types_sink"; + private static final String TEST_TABLE_SINK_GROUPBY = "t_all_types_sink_groupby"; + + protected static final Schema TABLE_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("c_boolean", DataTypes.BOOLEAN()) + .column("c_char", DataTypes.CHAR(1)) + .column("c_date", DataTypes.DATE()) + .column("c_datetime", DataTypes.TIMESTAMP(0)) + .column("c_decimal", DataTypes.DECIMAL(10, 2)) + .column("c_double", DataTypes.DOUBLE()) + .column("c_float", DataTypes.FLOAT()) + .column("c_int", DataTypes.INT()) + .column("c_bigint", DataTypes.BIGINT()) + .column("c_largeint", DataTypes.STRING()) + .column("c_smallint", DataTypes.SMALLINT()) + .column("c_string", DataTypes.STRING()) + .column("c_tinyint", DataTypes.TINYINT()) + .build(); + + private static final List<Row> ALL_TYPES_ROWS = + Lists.newArrayList( + Row.ofKind( + RowKind.INSERT, + "100001", + true, + "a", + Date.valueOf("2022-08-31").toLocalDate(), + Timestamp.valueOf("2022-08-31 11:12:13").toLocalDateTime(), + BigDecimal.valueOf(1.12).setScale(2), + 1.1234d, + 1.1f, + 1234567, + 1234567890L, + "123456790123456790", + Short.parseShort("10"), + "catalog", + Byte.parseByte("1")), + Row.ofKind( + RowKind.INSERT, + "100002", + true, + "a", + Date.valueOf("2022-08-31").toLocalDate(), + Timestamp.valueOf("2022-08-31 11:12:13").toLocalDateTime(), + BigDecimal.valueOf(1.12).setScale(2), + 1.1234d, + 1.1f, + 1234567, + 1234567890L, + "123456790123456790", + Short.parseShort("10"), + "catalog", + Byte.parseByte("1"))); + + private DorisCatalog catalog; + private TableEnvironment tEnv; + + @Before + public void setup() { + Map<String,String> props = new HashMap<>(); + props.put("sink.enable-2pc","false"); + catalog = new DorisCatalog(TEST_CATALOG_NAME, TEST_JDBCURL, TEST_DB, TEST_USERNAME, TEST_PWD, props); + this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + // Use doris catalog. + tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tEnv.useCatalog(TEST_CATALOG_NAME); + } + + @Test + public void testQueryFenodes(){ + String actual = catalog.queryFenodes(); + assertEquals("127.0.0.1:8030", actual); + } + + @Test + public void testListDatabases() { + List<String> actual = catalog.listDatabases(); + assertEquals(Collections.singletonList(TEST_DB), actual); + } + + @Test + public void testDbExists() throws Exception { + String databaseNotExist = "nonexistent"; + assertFalse(catalog.databaseExists(databaseNotExist)); + assertTrue(catalog.databaseExists(TEST_DB)); + } + + @Test + public void testListTables() throws DatabaseNotExistException { + List<String> actual = catalog.listTables(TEST_DB); + assertEquals( + Arrays.asList( + TEST_TABLE, + TEST_TABLE_SINK, + TEST_TABLE_SINK_GROUPBY), + actual); + } + + @Test + public void testTableExists() { + String tableNotExist = "nonexist"; + assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, tableNotExist))); + } + + @Test + public void testGetTable() throws TableNotExistException { + CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE)); + System.out.println(table); + assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema()); + } + + // ------ test select query. ------ + + @Test + public void testSelectField() { + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select id from %s", TEST_TABLE)) + .execute() + .collect()); + assertEquals( + Lists.newArrayList(Row.ofKind(RowKind.INSERT, "100001"), Row.ofKind(RowKind.INSERT, "100002")), + results); + } + + @Test + public void testWithoutCatalogDB() { + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE)) + .execute() + .collect()); + assertEquals(ALL_TYPES_ROWS, results); + } + + @Test + public void testWithoutCatalog() { + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`", + TEST_DB, TEST_TABLE)) + .execute() + .collect()); + assertEquals(ALL_TYPES_ROWS, results); + } + + @Test + public void testFullPath() { + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s.%s.`%s`", + TEST_CATALOG_NAME, + catalog.getDefaultDatabase(), + TEST_TABLE)) + .execute() + .collect()); + assertEquals(ALL_TYPES_ROWS, results); + } + + @Test + public void testSelectToInsert() throws Exception { + + String sql = + String.format( + "insert into `%s` select * from `%s`", + TEST_TABLE_SINK, TEST_TABLE); + tEnv.executeSql(sql).await(); + + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE_SINK)) + .execute() + .collect()); + assertEquals(ALL_TYPES_ROWS, results); + } + + @Test + public void testGroupByInsert() throws Exception { + // Changes primary key for the next record. + tEnv.executeSql( + String.format( + "insert into `%s` select `c_string`, max(`id`) `id` from `%s` " + + "group by `c_string` ", + TEST_TABLE_SINK_GROUPBY, TEST_TABLE)) + .await(); + + List<Row> results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + TEST_TABLE_SINK_GROUPBY)) + .execute() + .collect()); + assertEquals(Lists.newArrayList(Row.ofKind(RowKind.INSERT, "catalog","100002")), results); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java index a44b96d..4ab44bf 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.source.reader; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.doris.flink.sink.OptionUtils; import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -29,6 +30,7 @@ import static org.junit.Assert.assertEquals; /** * Unit tests for the {@link DorisSourceReader}. */ +@Ignore public class DorisSourceReaderTest { private static DorisSourceReader createReader(TestingReaderContext context) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org