This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 00b8821  [Feature] support doris catalog (#60)
00b8821 is described below

commit 00b882190e7d9492c6c77c8ce8474ba618be0ed6
Author: wudi <676366...@qq.com>
AuthorDate: Tue Sep 6 16:35:04 2022 +0800

    [Feature] support doris catalog (#60)
    
    * add doris catalog and fix thrift Concurrency bug
---
 .../apache/doris/flink/catalog/DorisCatalog.java   | 526 +++++++++++++++++++++
 .../doris/flink/catalog/DorisCatalogFactory.java   | 124 +++++
 .../doris/flink/catalog/DorisCatalogOptions.java   |  26 +
 .../doris/flink/catalog/DorisTypeMapper.java       |  89 ++++
 .../source/reader/DorisSourceSplitReader.java      |   8 -
 .../flink/source/reader/DorisValueReader.java      |  94 ++--
 .../doris/flink/table/DorisConfigOptions.java      | 145 ++++++
 .../flink/table/DorisDynamicTableFactory.java      | 152 ++----
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../apache/doris/flink/catalog/CatalogTest.java    | 263 +++++++++++
 .../flink/source/reader/DorisSourceReaderTest.java |   2 +
 11 files changed, 1263 insertions(+), 169 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
new file mode 100644
index 0000000..00f7fb3
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.table.DorisDynamicTableFactory;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.function.Predicate;
+
+import static 
org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * ClickHouse catalog.
+ */
+public class DorisCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisCatalog.class);
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                }
+            };
+
+    private final String username;
+    private final String password;
+    private final String jdbcUrl;
+    private final Map<String, String> properties;
+
+    public DorisCatalog(
+            String catalogName,
+            String jdbcUrl,
+            String defaultDatabase,
+            String username,
+            String password,
+            Map<String, String> properties) {
+        super(catalogName, defaultDatabase);
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(jdbcUrl), "jdbc-url cannot 
be null or empty");
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(username), "username 
cannot be null or empty");
+
+        this.jdbcUrl = jdbcUrl.endsWith("/") ? jdbcUrl : jdbcUrl + "/";
+        ;
+        this.username = username;
+        this.password = password;
+        this.properties = Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        // test connection, fail early if we cannot connect to database
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, 
password)) {
+        } catch (SQLException e) {
+            throw new ValidationException(
+                    String.format("Failed connecting to %s via JDBC.", 
jdbcUrl), e);
+        }
+
+        LOG.info("Catalog {} established connection to {}", getName(), 
jdbcUrl);
+    }
+
+    @Override
+    public synchronized void close() throws CatalogException {
+        try {
+            LOG.info("Closed catalog {} ", getName());
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Closing catalog %s 
failed.", getName()), e);
+        }
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new DorisDynamicTableFactory());
+    }
+
+    // ------------- databases -------------
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return extractColumnValuesBySQL(
+                jdbcUrl,
+                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+                1,
+                dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotEmptyException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // ------------- tables -------------
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                org.apache.commons.lang3.StringUtils.isNotBlank(databaseName), 
"Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return extractColumnValuesBySQL(
+                jdbcUrl + databaseName,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE 
TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    @Override
+    public List<String> listViews(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        String databaseName = tablePath.getDatabaseName();
+        String tableName = tablePath.getObjectName();
+        Map<String, String> props = new HashMap<>(properties);
+        props.put(CONNECTOR.key(), IDENTIFIER);
+        if (!props.containsKey(FENODES.key())) {
+            props.put(FENODES.key(), queryFenodes());
+        }
+        props.put(USERNAME.key(), username);
+        props.put(PASSWORD.key(), password);
+        props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName);
+
+        String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),"");
+        props.put(SINK_LABEL_PREFIX.key(), 
String.join("_",labelPrefix,databaseName,tableName));
+        //remove catalog option
+        props.remove(JDBCURL.key());
+        props.remove(DEFAULT_DATABASE.key());
+        return CatalogTable.of(createTableSchema(databaseName, tableName), 
null, Lists.newArrayList(), props);
+
+    }
+
+    @VisibleForTesting
+    protected String queryFenodes() {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, 
password)) {
+            StringJoiner fenodes = new StringJoiner(",");
+            PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS");
+            ResultSet resultSet = ps.executeQuery();
+            while (resultSet.next()) {
+                String ip = resultSet.getString("IP");
+                String port = resultSet.getString("HttpPort");
+                fenodes.add(ip + ":" + port);
+            }
+            return fenodes.toString();
+        } catch (Exception e) {
+            throw new CatalogException("Failed getting fenodes", e);
+        }
+    }
+
+    private Schema createTableSchema(String databaseName, String tableName) {
+        String dbUrl = jdbcUrl + databaseName;
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
password)) {
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format("SELECT 
COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM 
`information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= 
'%s'", databaseName, tableName));
+
+            List<String> columnNames = new ArrayList<>();
+            List<DataType> columnTypes = new ArrayList<>();
+            ResultSet resultSet = ps.executeQuery();
+            while (resultSet.next()) {
+                String columnName = resultSet.getString("COLUMN_NAME");
+                String columnType = resultSet.getString("DATA_TYPE");
+                long columnSize = resultSet.getLong("COLUMN_SIZE");
+                long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
+                DataType flinkType = DorisTypeMapper.toFlinkType(columnName, 
columnType, (int) columnSize, (int) columnDigit);
+                columnNames.add(columnName);
+                columnTypes.add(flinkType);
+            }
+            Schema.Builder schemaBuilder = 
Schema.newBuilder().fromFields(columnNames, columnTypes);
+            Schema tableSchema = schemaBuilder.build();
+            return tableSchema;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting catalog %s database %s table 
%s", getName(), databaseName, tableName), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && 
listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // ------------- partitions -------------
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException,
+            PartitionSpecInvalidException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws TableNotExistException, TableNotPartitionedException,
+            PartitionSpecInvalidException, PartitionAlreadyExistsException,
+            CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // ------------- functions -------------
+
+    @Override
+    public List<String> listFunctions(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+            throws FunctionAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // ------------- statistics -------------
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, 
boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableColumnStatistics(
+            ObjectPath tablePath,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException, 
TablePartitionedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            Predicate<String> filterFunc,
+            Object... params) {
+
+        List<String> columnValues = Lists.newArrayList();
+
+        try (Connection conn = DriverManager.getConnection(connUrl, username, 
password);
+             PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || 
filterFunc.test(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "The following SQL query could not be executed 
(%s): %s", connUrl, sql),
+                    e);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
new file mode 100644
index 0000000..f23f0dc
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+
+/**
+ * Factory for {@link DorisCatalog}.
+ */
+public class DorisCatalogFactory implements CatalogFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(JDBCURL);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(JDBCURL);
+        options.add(DEFAULT_DATABASE);
+
+        options.add(FENODES);
+        options.add(TABLE_IDENTIFIER);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+
+        options.add(DORIS_READ_FIELD);
+        options.add(DORIS_FILTER_QUERY);
+        options.add(DORIS_TABLET_SIZE);
+        options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
+        options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
+        options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
+        options.add(DORIS_REQUEST_RETRIES);
+        options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
+        options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
+        options.add(DORIS_BATCH_SIZE);
+        options.add(DORIS_EXEC_MEM_LIMIT);
+
+        options.add(SINK_CHECK_INTERVAL);
+        options.add(SINK_ENABLE_2PC);
+        options.add(SINK_MAX_RETRIES);
+        options.add(SINK_ENABLE_DELETE);
+        options.add(SINK_LABEL_PREFIX);
+        options.add(SINK_BUFFER_SIZE);
+        options.add(SINK_BUFFER_COUNT);
+
+        options.add(SOURCE_USE_OLD_API);
+        return options;
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+
+        return new DorisCatalog(
+                context.getName(),
+                helper.getOptions().get(JDBCURL),
+                helper.getOptions().get(DEFAULT_DATABASE),
+                helper.getOptions().get(USERNAME),
+                helper.getOptions().get(PASSWORD),
+                ((Configuration) helper.getOptions()).toMap());
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
new file mode 100644
index 0000000..ff87f9b
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+public class DorisCatalogOptions {
+    public static final ConfigOption<String> JDBCURL = 
ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris
 jdbc url.");
+    public static final ConfigOption<String> DEFAULT_DATABASE = 
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
new file mode 100644
index 0000000..f7a16ce
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+public class DorisTypeMapper {
+
+    // -------------------------number----------------------------
+    private static final String DORIS_TINYINT = "TINYINT";
+    private static final String DORIS_SMALLINT = "SMALLINT";
+    private static final String DORIS_INT = "INT";
+    private static final String DORIS_BIGINT = "BIGINT";
+    private static final String DORIS_LARGEINT = "BIGINT UNSIGNED";
+    private static final String DORIS_DECIMAL = "DECIMAL";
+    private static final String DORIS_FLOAT = "FLOAT";
+    private static final String DORIS_DOUBLE = "DOUBLE";
+
+    // -------------------------string----------------------------
+    private static final String DORIS_CHAR = "CHAR";
+    private static final String DORIS_VARCHAR = "VARCHAR";
+    private static final String DORIS_STRING = "STRING";
+    private static final String DORIS_TEXT = "TEXT";
+
+    // ------------------------------time-------------------------
+    private static final String DORIS_DATE = "DATE";
+    private static final String DORIS_DATETIME = "DATETIME";
+
+    //------------------------------bool------------------------
+    private static final String DORIS_BOOLEAN = "BOOLEAN";
+
+
+    public static DataType toFlinkType(String columnName, String columnType, 
int precision, int scale) {
+        columnType = columnType.toUpperCase();
+        switch (columnType) {
+            case DORIS_BOOLEAN:
+                return DataTypes.BOOLEAN();
+            case DORIS_TINYINT:
+                if (precision == 0) {
+                    //The boolean type will become tinyint when queried in 
information_schema, and precision=0
+                    return DataTypes.BOOLEAN();
+                } else {
+                    return DataTypes.TINYINT();
+                }
+            case DORIS_SMALLINT:
+                return DataTypes.SMALLINT();
+            case DORIS_INT:
+                return DataTypes.INT();
+            case DORIS_BIGINT:
+                return DataTypes.BIGINT();
+            case DORIS_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case DORIS_FLOAT:
+                return DataTypes.FLOAT();
+            case DORIS_DOUBLE:
+                return DataTypes.DOUBLE();
+            case DORIS_CHAR:
+                return DataTypes.CHAR(precision);
+            case DORIS_LARGEINT:
+            case DORIS_VARCHAR:
+            case DORIS_STRING:
+            case DORIS_TEXT:
+                return DataTypes.STRING();
+            case DORIS_DATE:
+                return DataTypes.DATE();
+            case DORIS_DATETIME:
+                return DataTypes.TIMESTAMP(0);
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support Doris type '%s' on column 
'%s'", columnType, columnName));
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index c5d33f7..b12bf2a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -74,14 +74,6 @@ public class DorisSourceSplitReader
     }
 
     private DorisSplitRecords finishSplit() {
-        if (valueReader != null) {
-            try {
-                valueReader.close();
-            } catch (Exception e) {
-                LOG.error("close resource reader failed,", e);
-            }
-            valueReader = null;
-        }
         final DorisSplitRecords finishRecords = 
DorisSplitRecords.finishedSplit(currentSplitId);
         currentSplitId = null;
         return finishRecords;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 173ea90..04474f0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
@@ -53,6 +55,8 @@ import static 
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAG
 public class DorisValueReader implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisValueReader.class);
     protected BackendClient client;
+    protected Lock clientLock = new ReentrantLock();
+
     private PartitionDefinition partition;
     private DorisOptions options;
     private DorisReadOptions readOptions;
@@ -85,10 +89,15 @@ public class DorisValueReader implements AutoCloseable {
     }
 
     private void init() {
-        this.openParams = openParams();
-        TScanOpenResult openResult = this.client.openScanner(this.openParams);
-        this.contextId = openResult.getContextId();
-        this.schema = 
SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+        clientLock.lock();
+        try {
+            this.openParams = openParams();
+            TScanOpenResult openResult = 
this.client.openScanner(this.openParams);
+            this.contextId = openResult.getContextId();
+            this.schema = 
SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+        } finally {
+            clientLock.unlock();
+        }
         this.asyncThreadStarted = asyncThreadStarted();
         LOG.debug("Open scan result is, contextId: {}, schema: {}.", 
contextId, schema);
     }
@@ -127,22 +136,27 @@ public class DorisValueReader implements AutoCloseable {
     protected Thread asyncThread = new Thread(new Runnable() {
         @Override
         public void run() {
-            TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
-            nextBatchParams.setContextId(contextId);
-            while (!eos.get()) {
-                nextBatchParams.setOffset(offset);
-                TScanBatchResult nextResult = client.getNext(nextBatchParams);
-                eos.set(nextResult.isEos());
-                if (!eos.get()) {
-                    RowBatch rowBatch = new RowBatch(nextResult, 
schema).readArrow();
-                    offset += rowBatch.getReadRowCount();
-                    rowBatch.close();
-                    try {
-                        rowBatchBlockingQueue.put(rowBatch);
-                    } catch (InterruptedException e) {
-                        throw new DorisRuntimeException(e);
+            clientLock.lock();
+            try{
+                TScanNextBatchParams nextBatchParams = new 
TScanNextBatchParams();
+                nextBatchParams.setContextId(contextId);
+                while (!eos.get()) {
+                    nextBatchParams.setOffset(offset);
+                    TScanBatchResult nextResult = 
client.getNext(nextBatchParams);
+                    eos.set(nextResult.isEos());
+                    if (!eos.get()) {
+                        RowBatch rowBatch = new RowBatch(nextResult, 
schema).readArrow();
+                        offset += rowBatch.getReadRowCount();
+                        rowBatch.close();
+                        try {
+                            rowBatchBlockingQueue.put(rowBatch);
+                        } catch (InterruptedException e) {
+                            throw new DorisRuntimeException(e);
+                        }
                     }
                 }
+            } finally {
+                clientLock.unlock();
             }
         }
     });
@@ -187,22 +201,27 @@ public class DorisValueReader implements AutoCloseable {
                 hasNext = true;
             }
         } else {
-            // Arrow data was acquired synchronously during the iterative 
process
-            if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
-                if (rowBatch != null) {
-                    offset += rowBatch.getReadRowCount();
-                    rowBatch.close();
-                }
-                TScanNextBatchParams nextBatchParams = new 
TScanNextBatchParams();
-                nextBatchParams.setContextId(contextId);
-                nextBatchParams.setOffset(offset);
-                TScanBatchResult nextResult = client.getNext(nextBatchParams);
-                eos.set(nextResult.isEos());
-                if (!eos.get()) {
-                    rowBatch = new RowBatch(nextResult, schema).readArrow();
+            clientLock.lock();
+            try{
+                // Arrow data was acquired synchronously during the iterative 
process
+                if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+                    if (rowBatch != null) {
+                        offset += rowBatch.getReadRowCount();
+                        rowBatch.close();
+                    }
+                    TScanNextBatchParams nextBatchParams = new 
TScanNextBatchParams();
+                    nextBatchParams.setContextId(contextId);
+                    nextBatchParams.setOffset(offset);
+                    TScanBatchResult nextResult = 
client.getNext(nextBatchParams);
+                    eos.set(nextResult.isEos());
+                    if (!eos.get()) {
+                        rowBatch = new RowBatch(nextResult, 
schema).readArrow();
+                    }
                 }
+                hasNext = !eos.get();
+            } finally {
+                clientLock.unlock();
             }
-            hasNext = !eos.get();
         }
         return hasNext;
     }
@@ -222,8 +241,13 @@ public class DorisValueReader implements AutoCloseable {
 
     @Override
     public void close() throws Exception {
-        TScanCloseParams closeParams = new TScanCloseParams();
-        closeParams.setContextId(contextId);
-        client.closeScanner(closeParams);
+        clientLock.lock();
+        try {
+            TScanCloseParams closeParams = new TScanCloseParams();
+            closeParams.setContextId(contextId);
+            client.closeScanner(closeParams);
+        } finally {
+            clientLock.unlock();
+        }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
new file mode 100644
index 0000000..5b56342
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+
+public class DorisConfigOptions {
+
+    public static final String IDENTIFIER = "doris";
+    // common option
+    public static final ConfigOption<String> FENODES = 
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
 fe http address.");
+    public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 jdbc table name.");
+    public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 jdbc user name.");
+    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 jdbc password.");
+
+    // source config options
+    public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
+            .key("doris.read.field")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("List of column names in the Doris table, 
separated by commas");
+    public static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
+            .key("doris.filter.query")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
+    public static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
+            .key("doris.request.tablet.size")
+            .intType()
+            .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS 
= ConfigOptions
+            .key("doris.request.connect.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = 
ConfigOptions
+            .key("doris.request.read.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = 
ConfigOptions
+            .key("doris.request.query.timeout.s")
+            .intType()
+            .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = 
ConfigOptions
+            .key("doris.request.retries")
+            .intType()
+            .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = 
ConfigOptions
+            .key("doris.deserialize.arrow.async")
+            .booleanType()
+            .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = 
ConfigOptions
+            .key("doris.request.retriesdoris.deserialize.queue.size")
+            .intType()
+            .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
+            .key("doris.batch.size")
+            .intType()
+            .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
+            .key("doris.exec.mem.limit")
+            .longType()
+            .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+            .withDescription("");
+    public static final ConfigOption<Boolean> SOURCE_USE_OLD_API = 
ConfigOptions
+            .key("source.use-old-api")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to read data using the new interface 
defined according to the FLIP-27 specification,default false");
+
+    // sink config options
+    public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
+            .key("sink.enable-2pc")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("enable 2PC while loading");
+
+    public static final ConfigOption<Integer> SINK_CHECK_INTERVAL = 
ConfigOptions
+            .key("sink.check-interval")
+            .intType()
+            .defaultValue(10000)
+            .withDescription("check exception with the interval while 
loading");
+    public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
+            .key("sink.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if writing records to 
database failed.");
+    public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
+            .key("sink.buffer-size")
+            .intType()
+            .defaultValue(256 * 1024)
+            .withDescription("the buffer size to cache data for stream load.");
+    public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
+            .key("sink.buffer-count")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the buffer count to cache data for stream 
load.");
+    public static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
+            .key("sink.label-prefix")
+            .stringType()
+            .defaultValue("")
+            .withDescription("the unique label prefix.");
+    public static final ConfigOption<Boolean> SINK_ENABLE_DELETE = 
ConfigOptions
+            .key("sink.enable-delete")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("whether to enable the delete function");
+
+    // Prefix for Doris StreamLoad specific properties.
+    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index fb44359..81ee23c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,7 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -31,21 +30,37 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-import java.time.Duration;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
-import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+
 
 /**
  * The {@link DorisDynamicTableFactory} translates the catalog table to a 
table source.
@@ -55,121 +70,9 @@ import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
  */
 public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
 
-    public static final ConfigOption<String> FENODES = 
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
 fe http address.");
-    public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 jdbc table name.");
-    public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 jdbc user name.");
-    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 jdbc password.");
-    // Prefix for Doris StreamLoad specific properties.
-    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
-    // doris options
-    private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
-            .key("doris.read.field")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("List of column names in the Doris table, 
separated by commas");
-    private static final ConfigOption<String> DORIS_FILTER_QUERY = 
ConfigOptions
-            .key("doris.filter.query")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
-    private static final ConfigOption<Integer> DORIS_TABLET_SIZE = 
ConfigOptions
-            .key("doris.request.tablet.size")
-            .intType()
-            .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> 
DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
-            .key("doris.request.connect.timeout.ms")
-            .intType()
-            .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = 
ConfigOptions
-            .key("doris.request.read.timeout.ms")
-            .intType()
-            .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = 
ConfigOptions
-            .key("doris.request.query.timeout.s")
-            .intType()
-            .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = 
ConfigOptions
-            .key("doris.request.retries")
-            .intType()
-            .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = 
ConfigOptions
-            .key("doris.deserialize.arrow.async")
-            .booleanType()
-            .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = 
ConfigOptions
-            .key("doris.request.retriesdoris.deserialize.queue.size")
-            .intType()
-            .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
-            .key("doris.batch.size")
-            .intType()
-            .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-            .withDescription("");
-    private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = 
ConfigOptions
-            .key("doris.exec.mem.limit")
-            .longType()
-            .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-            .withDescription("");
-    // flink write config options
-    private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
-            .key("sink.enable-2pc")
-            .booleanType()
-            .defaultValue(true)
-            .withDescription("enable 2PC while loading");
-
-    private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = 
ConfigOptions
-            .key("sink.check-interval")
-            .intType()
-            .defaultValue(10000)
-            .withDescription("check exception with the interval while 
loading");
-    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
-            .key("sink.max-retries")
-            .intType()
-            .defaultValue(3)
-            .withDescription("the max retry times if writing records to 
database failed.");
-    private static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
-            .key("sink.buffer-size")
-            .intType()
-            .defaultValue(256 * 1024)
-            .withDescription("the buffer size to cache data for stream load.");
-    private static final ConfigOption<Integer> SINK_BUFFER_COUNT = 
ConfigOptions
-            .key("sink.buffer-count")
-            .intType()
-            .defaultValue(3)
-            .withDescription("the buffer count to cache data for stream 
load.");
-    private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
-            .key("sink.label-prefix")
-            .stringType()
-            .defaultValue("")
-            .withDescription("the unique label prefix.");
-    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = 
ConfigOptions
-            .key("sink.batch.interval")
-            .durationType()
-            .defaultValue(Duration.ofSeconds(1))
-            .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
-                    "default value is 1s.");
-    private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = 
ConfigOptions
-            .key("sink.enable-delete")
-            .booleanType()
-            .defaultValue(true)
-            .withDescription("whether to enable the delete function");
-
-    private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = 
ConfigOptions
-            .key("source.use-old-api")
-            .booleanType()
-            .defaultValue(false)
-            .withDescription("Whether to read data using the new interface 
defined according to the FLIP-27 specification,default false");
-
     @Override
     public String factoryIdentifier() {
-        return "doris"; // used for matching to `connector = '...'`
+        return IDENTIFIER; // used for matching to `connector = '...'`
     }
 
     @Override
@@ -203,7 +106,6 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_CHECK_INTERVAL);
         options.add(SINK_ENABLE_2PC);
         options.add(SINK_MAX_RETRIES);
-        options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_ENABLE_DELETE);
         options.add(SINK_LABEL_PREFIX);
         options.add(SINK_BUFFER_SIZE);
diff --git 
a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index e625cc7..5863c8b 100644
--- 
a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,4 +15,5 @@
 # specific language governing permissions and limitations
 # under the License.
 
-org.apache.doris.flink.table.DorisDynamicTableFactory
\ No newline at end of file
+org.apache.doris.flink.table.DorisDynamicTableFactory
+org.apache.doris.flink.catalog.DorisCatalogFactory
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
new file mode 100644
index 0000000..e1452d3
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Class for unit tests to run on catalogs. */
+@Ignore
+public class CatalogTest {
+    private static final String TEST_CATALOG_NAME = "doris_catalog";
+    private static final String TEST_FENODES = "127.0.0.1:8030";
+    private static final String TEST_JDBCURL = 
"jdbc:mysql://127.0.0.1.78:9030";
+    private static final String TEST_USERNAME = "root";
+    private static final String TEST_PWD = "";
+    private static final String TEST_DB = "test";
+    private static final String TEST_TABLE = "t_all_types";
+    private static final String TEST_TABLE_SINK = "t_all_types_sink";
+    private static final String TEST_TABLE_SINK_GROUPBY = 
"t_all_types_sink_groupby";
+
+    protected static final Schema TABLE_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.STRING())
+                    .column("c_boolean", DataTypes.BOOLEAN())
+                    .column("c_char", DataTypes.CHAR(1))
+                    .column("c_date", DataTypes.DATE())
+                    .column("c_datetime", DataTypes.TIMESTAMP(0))
+                    .column("c_decimal", DataTypes.DECIMAL(10, 2))
+                    .column("c_double", DataTypes.DOUBLE())
+                    .column("c_float", DataTypes.FLOAT())
+                    .column("c_int", DataTypes.INT())
+                    .column("c_bigint", DataTypes.BIGINT())
+                    .column("c_largeint", DataTypes.STRING())
+                    .column("c_smallint", DataTypes.SMALLINT())
+                    .column("c_string", DataTypes.STRING())
+                    .column("c_tinyint", DataTypes.TINYINT())
+                    .build();
+
+    private static final List<Row> ALL_TYPES_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            "100001",
+                            true,
+                            "a",
+                            Date.valueOf("2022-08-31").toLocalDate(),
+                            Timestamp.valueOf("2022-08-31 
11:12:13").toLocalDateTime(),
+                            BigDecimal.valueOf(1.12).setScale(2),
+                            1.1234d,
+                            1.1f,
+                            1234567,
+                            1234567890L,
+                            "123456790123456790",
+                            Short.parseShort("10"),
+                            "catalog",
+                            Byte.parseByte("1")),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            "100002",
+                            true,
+                            "a",
+                            Date.valueOf("2022-08-31").toLocalDate(),
+                            Timestamp.valueOf("2022-08-31 
11:12:13").toLocalDateTime(),
+                            BigDecimal.valueOf(1.12).setScale(2),
+                            1.1234d,
+                            1.1f,
+                            1234567,
+                            1234567890L,
+                            "123456790123456790",
+                            Short.parseShort("10"),
+                            "catalog",
+                            Byte.parseByte("1")));
+
+    private DorisCatalog catalog;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        Map<String,String> props = new HashMap<>();
+        props.put("sink.enable-2pc","false");
+        catalog = new DorisCatalog(TEST_CATALOG_NAME, TEST_JDBCURL, TEST_DB, 
TEST_USERNAME, TEST_PWD, props);
+        this.tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        // Use doris catalog.
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    @Test
+    public void testQueryFenodes(){
+        String actual = catalog.queryFenodes();
+        assertEquals("127.0.0.1:8030", actual);
+    }
+
+    @Test
+    public void testListDatabases() {
+        List<String> actual = catalog.listDatabases();
+        assertEquals(Collections.singletonList(TEST_DB), actual);
+    }
+
+    @Test
+    public void testDbExists() throws Exception {
+        String databaseNotExist = "nonexistent";
+        assertFalse(catalog.databaseExists(databaseNotExist));
+        assertTrue(catalog.databaseExists(TEST_DB));
+    }
+
+    @Test
+    public void testListTables() throws DatabaseNotExistException {
+        List<String> actual = catalog.listTables(TEST_DB);
+        assertEquals(
+                Arrays.asList(
+                        TEST_TABLE,
+                        TEST_TABLE_SINK,
+                        TEST_TABLE_SINK_GROUPBY),
+                actual);
+    }
+
+    @Test
+    public void testTableExists() {
+        String tableNotExist = "nonexist";
+        assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, 
tableNotExist)));
+    }
+
+    @Test
+    public void testGetTable() throws TableNotExistException {
+        CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, 
TEST_TABLE));
+        System.out.println(table);
+        assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
+    }
+
+    // ------ test select query. ------
+
+    @Test
+    public void testSelectField() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select id from %s", 
TEST_TABLE))
+                                .execute()
+                                .collect());
+        assertEquals(
+                Lists.newArrayList(Row.ofKind(RowKind.INSERT, "100001"), 
Row.ofKind(RowKind.INSERT, "100002")),
+                results);
+    }
+
+    @Test
+    public void testWithoutCatalogDB() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", 
TEST_TABLE))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testWithoutCatalog() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`.`%s`",
+                                                TEST_DB, TEST_TABLE))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testFullPath() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s.%s.`%s`",
+                                                TEST_CATALOG_NAME,
+                                                catalog.getDefaultDatabase(),
+                                                TEST_TABLE))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testSelectToInsert() throws Exception {
+
+        String sql =
+                String.format(
+                        "insert into `%s` select * from `%s`",
+                        TEST_TABLE_SINK, TEST_TABLE);
+        tEnv.executeSql(sql).await();
+
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", 
TEST_TABLE_SINK))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testGroupByInsert() throws Exception {
+        // Changes primary key for the next record.
+        tEnv.executeSql(
+                        String.format(
+                                "insert into `%s` select  `c_string`, 
max(`id`) `id` from `%s` "
+                                        + "group by `c_string` ",
+                                TEST_TABLE_SINK_GROUPBY, TEST_TABLE))
+                .await();
+
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`",
+                                                TEST_TABLE_SINK_GROUPBY))
+                                .execute()
+                                .collect());
+        assertEquals(Lists.newArrayList(Row.ofKind(RowKind.INSERT, 
"catalog","100002")), results);
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
index a44b96d..4ab44bf 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.source.reader;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Unit tests for the {@link DorisSourceReader}.
  */
+@Ignore
 public class DorisSourceReaderTest {
 
     private static DorisSourceReader createReader(TestingReaderContext 
context) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to