This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 706baa0  [Feature] Refactoring DorisSouce based on FLIP-27 (#24)
706baa0 is described below

commit 706baa0348224059c3754a78d0a957e0258811ca
Author: wudi <676366...@qq.com>
AuthorDate: Thu Apr 21 10:40:47 2022 +0800

    [Feature] Refactoring DorisSouce based on FLIP-27 (#24)
    
    * Refactoring DorisSouce based on FLIP-27
---
 .../apache/doris/flink/cfg/DorisReadOptions.java   |  17 ++-
 .../DorisDeserializationSchema.java                |   8 ++
 .../RowDataDeserializationSchema.java              |  50 ++++++++
 .../SimpleListDeserializationSchema.java           |   7 +-
 .../converter/DorisRowConverter.java               | 128 +++++++++++++++++++++
 .../doris/flink/rest/PartitionDefinition.java      |  14 +--
 .../org/apache/doris/flink/rest/RestService.java   |   2 +-
 .../apache/doris/flink/serialization/RowBatch.java |   5 +-
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  14 +--
 .../org/apache/doris/flink/source/DorisSource.java | 123 ++++++++++++++++++++
 .../doris/flink/source/DorisSourceBuilder.java     |  68 +++++++++++
 .../flink/source/assigners/DorisSplitAssigner.java |  53 +++++++++
 .../source/assigners/SimpleSplitAssigner.java      |  58 ++++++++++
 .../source/enumerator/DorisSourceEnumerator.java   |  97 ++++++++++++++++
 .../source/enumerator/PendingSplitsCheckpoint.java |  50 ++++++++
 .../PendingSplitsCheckpointSerializer.java         | 124 ++++++++++++++++++++
 .../flink/source/reader/DorisRecordEmitter.java    |  62 ++++++++++
 .../flink/source/reader/DorisSourceReader.java     |  69 +++++++++++
 .../source/reader/DorisSourceSplitReader.java      | 103 +++++++++++++++++
 .../doris/flink/source/split/DorisSourceSplit.java |  73 ++++++++++++
 .../source/split/DorisSourceSplitSerializer.java   | 110 ++++++++++++++++++
 .../split/DorisSourceSplitState.java}              |  17 ++-
 .../flink/source/split/DorisSplitRecords.java      |  81 +++++++++++++
 .../flink/table/DorisDynamicTableFactory.java      |  15 ++-
 .../doris/flink/table/DorisDynamicTableSource.java |  48 +++++---
 .../doris/flink/table/DorisRowDataInputFormat.java |  22 ++--
 .../RowDataDeserializationSchemaTest.java          |  65 +++++++++++
 .../convert/DorisRowConverterTest.java             |  62 ++++++++++
 .../org/apache/doris/flink/sink/OptionUtils.java   |  27 +++--
 .../doris/flink/source/DorisSourceExampleTest.java |  48 ++++++++
 .../PendingSplitsCheckpointSerializerTest.java     |  50 ++++++++
 .../flink/source/reader/DorisSourceReaderTest.java |  69 +++++++++++
 .../flink/source/reader/TestingReaderContext.java  |  98 ++++++++++++++++
 .../split/DorisSourceSplitSerializerTest.java      |  44 +++++++
 .../flink/source/split/DorisSplitRecordsTest.java} |  23 ++--
 .../flink/table/DorisDynamicTableSourceTest.java   |  86 ++++++++++++++
 .../org/apache/doris/flink/utils/FactoryMocks.java |  46 ++++++++
 37 files changed, 1953 insertions(+), 83 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0beb18c..aead8a8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -37,10 +37,11 @@ public class DorisReadOptions implements Serializable {
     private Long execMemLimit;
     private Integer deserializeQueueSize;
     private Boolean deserializeArrowAsync;
+    private boolean useOldApi;
 
     public DorisReadOptions(String readFields, String filterQuery, Integer 
requestTabletSize, Integer requestConnectTimeoutMs, Integer 
requestReadTimeoutMs,
                             Integer requestQueryTimeoutS, Integer 
requestRetries, Integer requestBatchSize, Long execMemLimit,
-                            Integer deserializeQueueSize, Boolean 
deserializeArrowAsync) {
+                            Integer deserializeQueueSize, Boolean 
deserializeArrowAsync, boolean useOldApi) {
         this.readFields = readFields;
         this.filterQuery = filterQuery;
         this.requestTabletSize = requestTabletSize;
@@ -52,6 +53,7 @@ public class DorisReadOptions implements Serializable {
         this.execMemLimit = execMemLimit;
         this.deserializeQueueSize = deserializeQueueSize;
         this.deserializeArrowAsync = deserializeArrowAsync;
+        this.useOldApi = useOldApi;
     }
 
     public String getReadFields() {
@@ -98,12 +100,15 @@ public class DorisReadOptions implements Serializable {
         return deserializeArrowAsync;
     }
 
+    public boolean getUseOldApi() {
+        return useOldApi;
+    }
 
     public static Builder builder() {
         return new Builder();
     }
 
-    public static DorisReadOptions defaults(){
+    public static DorisReadOptions defaults() {
         return DorisReadOptions.builder().build();
     }
 
@@ -123,6 +128,7 @@ public class DorisReadOptions implements Serializable {
         private Long execMemLimit;
         private Integer deserializeQueueSize;
         private Boolean deserializeArrowAsync;
+        private Boolean useOldApi = false;
 
 
         public Builder setReadFields(String readFields) {
@@ -180,8 +186,13 @@ public class DorisReadOptions implements Serializable {
             return this;
         }
 
+        public Builder setUseOldApi(boolean useOldApi) {
+            this.useOldApi = useOldApi;
+            return this;
+        }
+
         public DorisReadOptions build() {
-            return new DorisReadOptions(readFields, filterQuery, 
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, 
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, 
deserializeQueueSize, deserializeArrowAsync);
+            return new DorisReadOptions(readFields, filterQuery, 
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, 
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, 
deserializeQueueSize, deserializeArrowAsync, useOldApi);
         }
 
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
index 2aaec99..608fa5c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
@@ -17,8 +17,16 @@
 package org.apache.doris.flink.deserialization;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
+import java.util.List;
 
+/**
+ * The deserialization schema describes how to turn the doris list record into 
data types
+ * (Java/Scala objects) that are processed by Flink.
+ **/
 public interface DorisDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+    void deserialize(List<?> record, Collector<T> out) throws Exception;
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
new file mode 100644
index 0000000..f9c65b3
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization;
+
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import java.util.List;
+
+
+/**
+ * A simple implementation of {@link DorisDeserializationSchema} which 
converts the received
+ * list record into {@link GenericRowData}.
+ */
+public class RowDataDeserializationSchema implements 
DorisDeserializationSchema<RowData> {
+
+    private final DorisRowConverter rowConverter;
+
+    public RowDataDeserializationSchema(RowType rowType) {
+        this.rowConverter = new DorisRowConverter(rowType);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return TypeInformation.of(RowData.class);
+    }
+
+    @Override
+    public void deserialize(List<?> record, Collector<RowData> out) throws 
Exception {
+        RowData row = rowConverter.convert(record);
+        out.collect(row);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index d9ec6e5..43e68ef 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -19,10 +19,10 @@ package org.apache.doris.flink.deserialization;
 
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
 
 import java.util.List;
 
-
 public class SimpleListDeserializationSchema implements 
DorisDeserializationSchema<List<?>> {
 
     @Override
@@ -30,4 +30,9 @@ public class SimpleListDeserializationSchema implements 
DorisDeserializationSche
         return TypeInformation.of(new TypeHint<List<?>>() {
         });
     }
+
+    @Override
+    public void deserialize(List<?> record, Collector<List<?>> out) throws 
Exception {
+        out.collect(record);
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
new file mode 100644
index 0000000..b84fcdb
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization.converter;
+
+import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class DorisRowConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final DeserializationConverter[] deserializationConverters;
+
+    public DorisRowConverter(RowType rowType) {
+        checkNotNull(rowType);
+        this.deserializationConverters = new 
DeserializationConverter[rowType.getFieldCount()];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            deserializationConverters[i] = 
createNullableConverter(rowType.getTypeAt(i));
+        }
+    }
+
+    /**
+     * Convert data retrieved from {@link RowBatch} to  {@link RowData}.
+     *
+     * @param record from rowBatch
+     */
+    public GenericRowData convert(List record){
+        GenericRowData rowData = new GenericRowData(record.size());
+        for (int i = 0; i < record.size(); i++) {
+            rowData.setField(i, 
deserializationConverters[i].deserialize(record.get(i)));
+        }
+        return rowData;
+    }
+
+
+    /**
+     * Create a nullable runtime {@link DeserializationConverter} from given 
{@link
+     * LogicalType}.
+     */
+    protected DeserializationConverter createNullableConverter(LogicalType 
type) {
+        return wrapIntoNullableInternalConverter(createConverter(type));
+    }
+
+    protected DeserializationConverter wrapIntoNullableInternalConverter(
+            DeserializationConverter deserializationConverter) {
+        return val -> {
+            if (val == null) {
+                return null;
+            } else {
+                return deserializationConverter.deserialize(val);
+            }
+        };
+    }
+
+    /** Runtime converter to convert doris field to {@link RowData} type 
object. */
+    @FunctionalInterface
+    interface DeserializationConverter extends Serializable {
+        /**
+         * Convert a doris field object of {@link RowBatch } to the  data 
structure object.
+         *
+         * @param field
+         */
+        Object deserialize(Object field);
+    }
+
+    protected DeserializationConverter createConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return val -> null;
+            case BOOLEAN:
+            case FLOAT:
+            case DOUBLE:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return val -> val;
+            case DECIMAL:
+                final int precision = ((DecimalType) type).getPrecision();
+                final int scale = ((DecimalType) type).getScale();
+                return val -> DecimalData.fromBigDecimal((BigDecimal) val, 
precision, scale);
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case DATE:
+            case CHAR:
+            case VARCHAR:
+                return val -> StringData.fromString((String) val);
+            case TIME_WITHOUT_TIME_ZONE:
+            case BINARY:
+            case VARBINARY:
+            case ARRAY:
+            case ROW:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type:" + 
type);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
index 8a66f76..e753d66 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.flink.rest;
 
-import org.apache.doris.flink.cfg.DorisOptions;
-
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashSet;
@@ -35,16 +33,10 @@ public class PartitionDefinition implements Serializable, 
Comparable<PartitionDe
     private final String beAddress;
     private final Set<Long> tabletIds;
     private final String queryPlan;
-    private final String serializedSettings;
 
     public PartitionDefinition(String database, String table,
-                               DorisOptions settings, String beAddress, 
Set<Long> tabletIds, String queryPlan)
+                               String beAddress, Set<Long> tabletIds, String 
queryPlan)
             throws IllegalArgumentException {
-        if (settings != null) {
-            this.serializedSettings = settings.save();
-        } else {
-            this.serializedSettings = null;
-        }
         this.database = database;
         this.table = table;
         this.beAddress = beAddress;
@@ -72,7 +64,6 @@ public class PartitionDefinition implements Serializable, 
Comparable<PartitionDe
         return queryPlan;
     }
 
-
     @Override
     public int compareTo(PartitionDefinition o) {
         int cmp = database.compareTo(o.database);
@@ -123,8 +114,7 @@ public class PartitionDefinition implements Serializable, 
Comparable<PartitionDe
                 Objects.equals(table, that.table) &&
                 Objects.equals(beAddress, that.beAddress) &&
                 Objects.equals(tabletIds, that.tabletIds) &&
-                Objects.equals(queryPlan, that.queryPlan) &&
-                Objects.equals(serializedSettings, that.serializedSettings);
+                Objects.equals(queryPlan, that.queryPlan) ;
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 6ac84bc..734bfdb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -675,7 +675,7 @@ public class RestService implements Serializable {
                         first, Math.min(beInfo.getValue().size(), first + 
tabletsSize)));
                 first = first + tabletsSize;
                 PartitionDefinition partitionDefinition =
-                        new PartitionDefinition(database, table, options,
+                        new PartitionDefinition(database, table,
                                 beInfo.getKey(), partitionTablets, 
opaquedQueryPlan);
                 logger.debug("Generate one PartitionDefinition '{}'.", 
partitionDefinition);
                 partitions.add(partitionDefinition);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index d235aa9..3be6f87 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -36,6 +36,8 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,9 +46,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * row batch data container.
  */
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 7849559..b468cc8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -17,10 +17,6 @@
 
 package org.apache.doris.flink.sink.writer;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -31,7 +27,9 @@ import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.ResponseUtil;
-
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -41,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -53,11 +50,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 
 import static org.apache.doris.flink.sink.LoadStatus.FAIL;
-import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
 import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
-import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
-import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
-import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
new file mode 100644
index 0000000..e2b6d41
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.assigners.DorisSplitAssigner;
+import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
+import org.apache.doris.flink.source.enumerator.DorisSourceEnumerator;
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import 
org.apache.doris.flink.source.enumerator.PendingSplitsCheckpointSerializer;
+import org.apache.doris.flink.source.reader.DorisRecordEmitter;
+import org.apache.doris.flink.source.reader.DorisSourceReader;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * DorisSource based on FLIP-27 which is a BOUNDED stream.
+ **/
+public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit, 
PendingSplitsCheckpoint>,
+        ResultTypeQueryable<OUT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSource.class);
+
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
+
+    // Boundedness
+    private final Boundedness boundedness;
+    private final DorisDeserializationSchema<OUT> deserializer;
+
+    public DorisSource(DorisOptions options,
+                       DorisReadOptions readOptions,
+                       Boundedness boundedness,
+                       DorisDeserializationSchema<OUT> deserializer) {
+        this.options = options;
+        this.readOptions = readOptions;
+        this.boundedness = boundedness;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return this.boundedness;
+    }
+
+    @Override
+    public SourceReader<OUT, DorisSourceSplit> 
createReader(SourceReaderContext readerContext) throws Exception {
+        return new DorisSourceReader<>(
+                options,
+                readOptions,
+                new DorisRecordEmitter<>(deserializer),
+                readerContext,
+                readerContext.getConfiguration()
+        );
+    }
+
+    @Override
+    public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(SplitEnumeratorContext<DorisSourceSplit> context) throws 
Exception {
+        List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
+        List<PartitionDefinition> partitions = 
RestService.findPartitions(options, readOptions, LOG);
+        partitions.forEach(m -> dorisSourceSplits.add(new 
DorisSourceSplit(m)));
+        DorisSplitAssigner splitAssigner = new 
SimpleSplitAssigner(dorisSourceSplits);
+
+        return new DorisSourceEnumerator(context, splitAssigner);
+    }
+
+    @Override
+    public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<DorisSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) throws Exception {
+        Collection<DorisSourceSplit> splits = checkpoint.getSplits();
+        DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits);
+        return new DorisSourceEnumerator(context, splitAssigner);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<DorisSourceSplit> getSplitSerializer() {
+        return DorisSourceSplitSerializer.INSTANCE;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PendingSplitsCheckpoint> 
getEnumeratorCheckpointSerializer() {
+        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializer.getProducedType();
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
new file mode 100644
index 0000000..8d96c08
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+
+/**
+ * The builder class for {@link DorisSource} to make it easier for the users 
to construct a {@link
+ * DorisSource}.
+ **/
+public class DorisSourceBuilder<OUT> {
+
+    private DorisOptions options;
+    private DorisReadOptions readOptions;
+
+    // Boundedness
+    private Boundedness boundedness;
+    private DorisDeserializationSchema<OUT> deserializer;
+
+    DorisSourceBuilder() {
+        boundedness = Boundedness.BOUNDED;
+    }
+
+    public static <OUT> DorisSourceBuilder<OUT> builder() {
+        return new DorisSourceBuilder();
+    }
+
+    public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
+        this.options = options;
+        return this;
+    }
+
+    public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions 
readOptions) {
+        this.readOptions = readOptions;
+        return this;
+    }
+
+    public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) {
+        this.boundedness = boundedness;
+        return this;
+    }
+
+    public DorisSourceBuilder<OUT> 
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
+        this.deserializer = deserializer;
+        return this;
+    }
+
+    public DorisSource<OUT> build() {
+        return new DorisSource<>(options, readOptions, boundedness, 
deserializer);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
new file mode 100644
index 0000000..bf541b3
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.assigners;
+
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code DorisSplitAssigner} is responsible for deciding what split 
should be processed. It
+ * determines split processing order.
+ */
+public interface DorisSplitAssigner {
+
+    /**
+     * Gets the next split.
+     *
+     * <p>When this method returns an empty {@code Optional}, then the set of 
splits is assumed to
+     * be done and the source will finish once the readers finished their 
current splits.
+     */
+    Optional<DorisSourceSplit> getNext(@Nullable String hostname);
+
+    /**
+     * Adds a set of splits to this assigner. This happens for example when 
some split processing
+     * failed and the splits need to be re-added, or when new splits got 
discovered.
+     */
+    void addSplits(Collection<DorisSourceSplit> splits);
+
+    /**
+     * Creates a snapshot of the state of this split assigner, to be stored in 
a checkpoint.
+     *
+     * @param checkpointId The ID of the checkpoint for which the snapshot is 
created.
+     * @return an object containing the state of the split enumerator.
+     */
+    PendingSplitsCheckpoint snapshotState(long checkpointId);
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
new file mode 100644
index 0000000..eef17f2
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.assigners;
+
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code SimpleSplitAssigner} hands out splits in a random order.
+ **/
+public class SimpleSplitAssigner implements DorisSplitAssigner {
+
+    private final ArrayList<DorisSourceSplit> splits;
+
+    public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) {
+        this.splits = new ArrayList<>(splits);
+    }
+
+    @Override
+    public Optional<DorisSourceSplit> getNext(@Nullable String hostname) {
+        final int size = splits.size();
+        return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 
1));
+    }
+
+    @Override
+    public void addSplits(Collection<DorisSourceSplit> splits) {
+        splits.addAll(splits);
+    }
+
+    @Override
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+        return new PendingSplitsCheckpoint(splits);
+    }
+
+    @Override
+    public String toString() {
+        return "SimpleSplitAssigner " + splits;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
new file mode 100644
index 0000000..3275a22
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.assigners.DorisSplitAssigner;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation for bounded / batch {@link DorisSource} 
input.
+ * <p>
+ * This enumerator takes all backend tablets and assigns them to the readers.
+ * Once tablets are processed, the source is finished.
+ */
+public class DorisSourceEnumerator
+        implements SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSourceEnumerator.class);
+    private final SplitEnumeratorContext<DorisSourceSplit> context;
+
+    private final DorisSplitAssigner splitAssigner;
+
+    public DorisSourceEnumerator(SplitEnumeratorContext<DorisSourceSplit> 
context,
+                                 DorisSplitAssigner splitAssigner) {
+        this.context = context;
+        this.splitAssigner = checkNotNull(splitAssigner);
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String hostname) {
+        if (!context.registeredReaders().containsKey(subtaskId)) {
+            // reader failed between sending the request and now. skip this 
request.
+            return;
+        }
+
+        final Optional<DorisSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
+        if (nextSplit.isPresent()) {
+            final DorisSourceSplit split = nextSplit.get();
+            context.assignSplit(split, subtaskId);
+            LOG.info("Assigned split to subtask {} : {}", subtaskId, split);
+        } else {
+            context.signalNoMoreSplits(subtaskId);
+            LOG.info("No more splits available for subtask {}", subtaskId);
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
+        LOG.debug("Doris Source Enumerator adds splits back: {}", splits);
+        splitAssigner.addSplits(splits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // do nothing
+    }
+
+    @Override
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) throws 
Exception {
+        return splitAssigner.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no resources to close
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
new file mode 100644
index 0000000..3edba0f
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+ * A checkpoint of the current state of the containing the currently pending 
splits that are not yet
+ * assigned.
+ */
+public class PendingSplitsCheckpoint {
+
+    /**
+     * The splits in the checkpoint.
+     */
+    private final Collection<DorisSourceSplit> splits;
+
+    /**
+     * The cached byte representation from the last serialization step. This 
helps to avoid paying
+     * repeated serialization cost for the same checkpoint object. This field 
is used by {@link
+     * PendingSplitsCheckpointSerializer}.
+     */
+    @Nullable
+    byte[] serializedFormCache;
+
+    public PendingSplitsCheckpoint(Collection<DorisSourceSplit> splits) {
+        this.splits = splits;
+    }
+
+    public Collection<DorisSourceSplit> getSplits() {
+        return splits;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..deeb544
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for the {@link PendingSplitsCheckpoint}.
+ */
+public class PendingSplitsCheckpointSerializer
+        implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+
+    private static final int VERSION = 1;
+
+    private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
+
+    private final SimpleVersionedSerializer<DorisSourceSplit> splitSerializer;
+
+    public 
PendingSplitsCheckpointSerializer(SimpleVersionedSerializer<DorisSourceSplit> 
splitSerializer) {
+        this.splitSerializer = checkNotNull(splitSerializer);
+    }
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws 
IOException {
+        // optimization: the splits lazily cache their own serialized form
+        if (checkpoint.serializedFormCache != null) {
+            return checkpoint.serializedFormCache;
+        }
+
+        Collection<DorisSourceSplit> splits = checkpoint.getSplits();
+        final ArrayList<byte[]> serializedSplits = new 
ArrayList<>(splits.size());
+
+        int totalLen =
+                12; // four ints: magic, version of split serializer, count 
splits
+
+        for (DorisSourceSplit split : splits) {
+            final byte[] serSplit = splitSerializer.serialize(split);
+            serializedSplits.add(serSplit);
+            totalLen += serSplit.length + 4; // 4 bytes for the length field
+        }
+
+        final byte[] result = new byte[totalLen];
+        final ByteBuffer byteBuffer = 
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+        byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
+        byteBuffer.putInt(splitSerializer.getVersion());
+        byteBuffer.putInt(serializedSplits.size());
+
+        for (byte[] splitBytes : serializedSplits) {
+            byteBuffer.putInt(splitBytes.length);
+            byteBuffer.put(splitBytes);
+        }
+
+        assert byteBuffer.remaining() == 0;
+
+        // optimization: cache the serialized from, so we avoid the byte work 
during repeated
+        // serialization
+        checkpoint.serializedFormCache = result;
+
+        return result;
+    }
+
+    @Override
+    public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) 
throws IOException {
+        if (version == 1) {
+            return deserialize(serialized);
+        }
+        throw new IOException("Unknown version: " + version);
+    }
+
+    private PendingSplitsCheckpoint deserialize(byte[] serialized) throws 
IOException {
+        final ByteBuffer bb = 
ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+        final int magic = bb.getInt();
+        if (magic != VERSION_1_MAGIC_NUMBER) {
+            throw new IOException(
+                    String.format(
+                            "Invalid magic number for PendingSplitsCheckpoint. 
"
+                                    + "Expected: %X , found %X",
+                            VERSION_1_MAGIC_NUMBER, magic));
+        }
+
+        final int splitSerializerVersion = bb.getInt();
+        final int numSplits = bb.getInt();
+
+        SimpleVersionedSerializer<DorisSourceSplit> splitSerializer = 
this.splitSerializer;// stack cache
+        final ArrayList<DorisSourceSplit> splits = new ArrayList<>(numSplits);
+
+        for (int remaining = numSplits; remaining > 0; remaining--) {
+            final byte[] bytes = new byte[bb.getInt()];
+            bb.get(bytes);
+            final DorisSourceSplit split = 
splitSerializer.deserialize(splitSerializerVersion, bytes);
+            splits.add(split);
+        }
+        return new PendingSplitsCheckpoint(splits);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
new file mode 100644
index 0000000..61c31fc
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.source.split.DorisSourceSplitState;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link DorisSourceReader}.
+ **/
+public class DorisRecordEmitter<T>
+        implements RecordEmitter<List, T, DorisSourceSplitState> {
+
+    private final DorisDeserializationSchema<T> dorisDeserializationSchema;
+    private final OutputCollector<T> outputCollector;
+
+
+    public DorisRecordEmitter(DorisDeserializationSchema<T> 
dorisDeserializationSchema) {
+        this.dorisDeserializationSchema = dorisDeserializationSchema;
+        this.outputCollector = new OutputCollector<>();
+    }
+
+
+    @Override
+    public void emitRecord(List value, SourceOutput<T> output, 
DorisSourceSplitState splitState) throws Exception {
+        outputCollector.output = output;
+        dorisDeserializationSchema.deserialize(value, outputCollector);
+    }
+
+    private static class OutputCollector<T> implements Collector<T> {
+        private SourceOutput<T> output;
+
+        @Override
+        public void collect(T record) {
+            output.collect(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
new file mode 100644
index 0000000..db63d19
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitState;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link SourceReader} that read records from {@link DorisSourceSplit}.
+ **/
+public class DorisSourceReader<T>
+        extends SingleThreadMultiplexSourceReaderBase<List, T, 
DorisSourceSplit, DorisSourceSplitState> {
+
+
+    public DorisSourceReader(DorisOptions options,
+                             DorisReadOptions readOptions,
+                             RecordEmitter<List, T, DorisSourceSplitState> 
recordEmitter,
+                             SourceReaderContext context,
+                             Configuration config) {
+        super(() -> new DorisSourceSplitReader(options, readOptions), 
recordEmitter, config, context);
+    }
+
+    @Override
+    public void start() {
+        // we request a split only if we did not get splits during the 
checkpoint restore
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, DorisSourceSplitState> 
finishedSplitIds) {
+        context.sendSplitRequest();
+    }
+
+    @Override
+    protected DorisSourceSplitState initializedState(DorisSourceSplit split) {
+        return new DorisSourceSplitState(split);
+    }
+
+    @Override
+    protected DorisSourceSplit toSplitType(String splitId, 
DorisSourceSplitState splitState) {
+        return splitState.toDorisSourceSplit();
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
new file mode 100644
index 0000000..aadf6fd
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSplitRecords;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * The {@link SplitReader} implementation for the doris source.
+ **/
+public class DorisSourceSplitReader
+        implements SplitReader<List, DorisSourceSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSourceSplitReader.class);
+
+    private final Queue<DorisSourceSplit> splits;
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
+    private ScalaValueReader scalaValueReader;
+    private String currentSplitId;
+
+    public DorisSourceSplitReader(DorisOptions options, DorisReadOptions 
readOptions) {
+        this.options = options;
+        this.readOptions = readOptions;
+        this.splits = new ArrayDeque<>();
+    }
+
+    @Override
+    public RecordsWithSplitIds<List> fetch() throws IOException {
+        checkSplitOrStartNext();
+
+        if (!scalaValueReader.hasNext()) {
+            return finishSplit();
+        }
+        return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader);
+    }
+
+    private void checkSplitOrStartNext() throws IOException {
+        if (scalaValueReader != null) {
+            return;
+        }
+        final DorisSourceSplit nextSplit = splits.poll();
+        if (nextSplit == null) {
+            throw new IOException("Cannot fetch from another split - no split 
remaining");
+        }
+        currentSplitId = nextSplit.splitId();
+        scalaValueReader = new 
ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
+    }
+
+    private DorisSplitRecords finishSplit() {
+        if (scalaValueReader != null) {
+            scalaValueReader.close();
+            scalaValueReader = null;
+        }
+        final DorisSplitRecords finishRecords = 
DorisSplitRecords.finishedSplit(currentSplitId);
+        currentSplitId = null;
+        return finishRecords;
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<DorisSourceSplit> 
splitsChange) {
+        LOG.debug("Handling split change {}", splitsChange);
+        splits.addAll(splitsChange.splits());
+    }
+
+    @Override
+    public void wakeUp() {
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (scalaValueReader != null) {
+            scalaValueReader.close();
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
new file mode 100644
index 0000000..c6a644d
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * A {@link SourceSplit} that represents a {@link PartitionDefinition}.
+ **/
+public class DorisSourceSplit implements SourceSplit {
+
+    private final PartitionDefinition partitionDefinition;
+
+    /**
+     * The splits are frequently serialized into checkpoints. Caching the byte 
representation makes
+     * repeated serialization cheap. This field is used by {@link 
DorisSourceSplitSerializer}.
+     */
+    @Nullable
+    transient byte[] serializedFormCache;
+
+    public DorisSourceSplit(PartitionDefinition partitionDefinition) {
+        this.partitionDefinition = partitionDefinition;
+    }
+
+    @Override
+    public String splitId() {
+        return partitionDefinition.getBeAddress();
+    }
+
+    public PartitionDefinition getPartitionDefinition() {
+        return partitionDefinition;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DorisSourceSplit: %s.%s,be=%s,tablets=%s",
+                partitionDefinition.getDatabase(),
+                partitionDefinition.getTable(),
+                partitionDefinition.getBeAddress(),
+                partitionDefinition.getTabletIds());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DorisSourceSplit that = (DorisSourceSplit) o;
+
+        return Objects.equals(partitionDefinition, that.partitionDefinition);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
new file mode 100644
index 0000000..d667ed6
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A serializer for the {@link DorisSourceSplit}.
+ **/
+public class DorisSourceSplitSerializer
+        implements SimpleVersionedSerializer<DorisSourceSplit> {
+
+    public static final DorisSourceSplitSerializer INSTANCE = new 
DorisSourceSplitSerializer();
+
+    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+            ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+    private static final int VERSION = 1;
+
+    private static void writeLongArray(DataOutputView out, Long[] values) 
throws IOException {
+        out.writeInt(values.length);
+        for (Long val : values) {
+            out.writeLong(val);
+        }
+    }
+
+    private static Long[] readLongArray(DataInputView in) throws IOException {
+        final int len = in.readInt();
+        final Long[] values = new Long[len];
+        for (int i = 0; i < len; i++) {
+            values[i] = in.readLong();
+        }
+        return values;
+    }
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+
+    @Override
+    public byte[] serialize(DorisSourceSplit split) throws IOException {
+
+        // optimization: the splits lazily cache their own serialized form
+        if (split.serializedFormCache != null) {
+            return split.serializedFormCache;
+        }
+
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        PartitionDefinition partDef = split.getPartitionDefinition();
+        out.writeUTF(partDef.getDatabase());
+        out.writeUTF(partDef.getTable());
+        out.writeUTF(partDef.getBeAddress());
+        writeLongArray(out, partDef.getTabletIds().toArray(new Long[]{}));
+        out.writeUTF(partDef.getQueryPlan());
+
+        final byte[] result = out.getCopyOfBuffer();
+        out.clear();
+
+        // optimization: cache the serialized from, so we avoid the byte work 
during repeated
+        // serialization
+        split.serializedFormCache = result;
+
+        return result;
+    }
+
+    @Override
+    public DorisSourceSplit deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version == 1) {
+            return deserialize(serialized);
+        }
+        throw new IOException("Unknown version: " + version);
+    }
+
+    private DorisSourceSplit deserialize(byte[] serialized) throws IOException 
{
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+        final String database = in.readUTF();
+        final String table = in.readUTF();
+        final String beAddress = in.readUTF();
+        Long[] vals = readLongArray(in);
+        final Set<Long> tabletIds = new HashSet<>(Arrays.asList(vals));
+        final String queryPlan = in.readUTF();
+        PartitionDefinition partDef = new PartitionDefinition(database, table, 
beAddress, tabletIds, queryPlan);
+        return new DorisSourceSplit(partDef);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
similarity index 67%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
index 2aaec99..6369a08 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
@@ -14,11 +14,20 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.deserialization;
+package org.apache.doris.flink.source.split;
 
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+/**
+ * State of the reader, essentially a mutable version of the {@link 
DorisSourceSplit}.
+ **/
+public class DorisSourceSplitState {
 
-import java.io.Serializable;
+    private final DorisSourceSplit split;
 
-public interface DorisDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+    public DorisSourceSplitState(DorisSourceSplit split) {
+        this.split = split;
+    }
+
+    public DorisSourceSplit toDorisSourceSplit() {
+        return split;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
new file mode 100644
index 0000000..6f02446
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An implementation of {@link RecordsWithSplitIds}.
+ * This is essentially a slim wrapper around the {@link ScalaValueReader} that 
only adds
+ * information about the current split, or finished splits
+ */
+public class DorisSplitRecords implements RecordsWithSplitIds<List> {
+
+    private final Set<String> finishedSplits;
+    private final ScalaValueReader scalaValueReader;
+    private String splitId;
+
+    public DorisSplitRecords(String splitId,
+                             ScalaValueReader scalaValueReader,
+                             Set<String> finishedSplits) {
+        this.splitId = splitId;
+        this.scalaValueReader = scalaValueReader;
+        this.finishedSplits = finishedSplits;
+    }
+
+    public static DorisSplitRecords forRecords(
+            final String splitId, final ScalaValueReader valueReader) {
+        return new DorisSplitRecords(splitId, valueReader, 
Collections.emptySet());
+    }
+
+    public static DorisSplitRecords finishedSplit(final String splitId) {
+        return new DorisSplitRecords(null, null, 
Collections.singleton(splitId));
+    }
+
+    @Nullable
+    @Override
+    public String nextSplit() {
+        // move the split one (from current value to null)
+        final String nextSplit = this.splitId;
+        this.splitId = null;
+        if (scalaValueReader == null || !scalaValueReader.hasNext()) {
+            return null;
+        }
+        return nextSplit;
+    }
+
+    @Nullable
+    @Override
+    public List nextRecordFromSplit() {
+        if (scalaValueReader != null && scalaValueReader.hasNext()) {
+            List next = scalaValueReader.next();
+            return next;
+        }
+        return null;
+    }
+
+    @Override
+    public Set<String> finishedSplits() {
+        return finishedSplits;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 76a7bbc..7d6455e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -19,15 +19,12 @@ package org.apache.doris.flink.table;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -37,7 +34,6 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
@@ -159,6 +155,12 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             .defaultValue(true)
             .withDescription("whether to enable the delete function");
 
+    private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = 
ConfigOptions
+            .key("source.use-old-api")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to read data using the new interface 
defined according to the FLIP-27 specification,default false");
+
     @Override
     public String factoryIdentifier() {
         return "doris"; // used for matching to `connector = '...'`
@@ -199,6 +201,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_LABEL_PREFIX);
         options.add(SINK_BUFFER_SIZE);
         options.add(SINK_BUFFER_COUNT);
+
+        options.add(SOURCE_USE_OLD_API);
         return options;
     }
 
@@ -244,7 +248,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
                 
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
                 
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
                 .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
-                .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+                .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
+                .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 689aa47..0af8ad5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -19,9 +19,12 @@ package org.apache.doris.flink.table;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.RowDataDeserializationSchema;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.PartitionDefinition;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.DorisSourceBuilder;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableSchema;
@@ -30,6 +33,7 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
@@ -50,12 +54,14 @@ import java.util.List;
  */
 public final class DorisDynamicTableSource implements ScanTableSource, 
LookupTableSource {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicTableSource.class);
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private TableSchema physicalSchema;
-    private static final Logger LOG = 
LoggerFactory.getLogger(DorisRowDataInputFormat.class);
 
-    public DorisDynamicTableSource(DorisOptions options, DorisReadOptions 
readOptions, TableSchema physicalSchema) {
+    public DorisDynamicTableSource(DorisOptions options,
+                                   DorisReadOptions readOptions,
+                                   TableSchema physicalSchema) {
         this.options = options;
         this.readOptions = readOptions;
         this.physicalSchema = physicalSchema;
@@ -70,21 +76,31 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        List<PartitionDefinition> dorisPartitions;
-        try {
-            dorisPartitions = RestService.findPartitions(options, readOptions, 
LOG);
-        } catch (DorisException e) {
-            throw new RuntimeException("Failed fetch doris partitions");
+        if (readOptions.getUseOldApi()) {
+            List<PartitionDefinition> dorisPartitions;
+            try {
+                dorisPartitions = RestService.findPartitions(options, 
readOptions, LOG);
+            } catch (DorisException e) {
+                throw new RuntimeException("Failed fetch doris partitions");
+            }
+            DorisRowDataInputFormat.Builder builder = 
DorisRowDataInputFormat.builder()
+                    .setFenodes(options.getFenodes())
+                    .setUsername(options.getUsername())
+                    .setPassword(options.getPassword())
+                    .setTableIdentifier(options.getTableIdentifier())
+                    .setPartitions(dorisPartitions)
+                    .setReadOptions(readOptions)
+                    .setRowType((RowType) 
physicalSchema.toRowDataType().getLogicalType());
+            return InputFormatProvider.of(builder.build());
+        } else {
+            //Read data using the interface of the FLIP-27 specification
+            DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
+                    .setDorisReadOptions(readOptions)
+                    .setDorisOptions(options)
+                    .setDeserializer(new 
RowDataDeserializationSchema((RowType) 
physicalSchema.toRowDataType().getLogicalType()))
+                    .build();
+            return SourceProvider.of(build);
         }
-        DorisRowDataInputFormat.Builder builder = 
DorisRowDataInputFormat.builder()
-                .setFenodes(options.getFenodes())
-                .setUsername(options.getUsername())
-                .setPassword(options.getPassword())
-                .setTableIdentifier(options.getTableIdentifier())
-                .setPartitions(dorisPartitions)
-                .setReadOptions(readOptions)
-                .setRowType((RowType) 
physicalSchema.toRowDataType().getLogicalType());
-        return InputFormatProvider.of(builder.build());
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index be1e13d..fbcaeea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.doris.flink.rest.PartitionDefinition;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -30,12 +31,13 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -43,9 +45,6 @@ import java.sql.PreparedStatement;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * InputFormat for {@link DorisDynamicTableSource}.
  */
@@ -63,13 +62,16 @@ public class DorisRowDataInputFormat extends 
RichInputFormat<RowData, DorisTable
     private ScalaValueReader scalaValueReader;
     private transient boolean hasNext;
 
-    private RowType rowType;
+    private final DorisRowConverter rowConverter;
 
-    public DorisRowDataInputFormat(DorisOptions options, 
List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions, 
RowType rowType) {
+    public DorisRowDataInputFormat(DorisOptions options,
+                                   List<PartitionDefinition> dorisPartitions,
+                                   DorisReadOptions readOptions,
+                                   RowType rowType) {
         this.options = options;
         this.dorisPartitions = dorisPartitions;
         this.readOptions = readOptions;
-        this.rowType = rowType;
+        this.rowConverter = new DorisRowConverter(rowType);
     }
 
     @Override
@@ -146,11 +148,7 @@ public class DorisRowDataInputFormat extends 
RichInputFormat<RowData, DorisTable
             return null;
         }
         List next = (List) scalaValueReader.next();
-        GenericRowData genericRowData = new 
GenericRowData(rowType.getFieldCount());
-        for (int i = 0; i < next.size() && i < rowType.getFieldCount(); i++) {
-            Object value = deserialize(rowType.getTypeAt(i), next.get(i));
-            genericRowData.setField(i, value);
-        }
+        RowData genericRowData = rowConverter.convert(next);
         //update hasNext after we've read the record
         hasNext = scalaValueReader.hasNext();
         return genericRowData;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
new file mode 100644
index 0000000..cf22009
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.doris.flink.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.junit.Assert.assertEquals;
+
+public class RowDataDeserializationSchemaTest {
+
+    @Test
+    public void deserializeTest() throws Exception {
+        List<String> records = Arrays.asList("flink","doris");
+        SimpleCollector collector = new SimpleCollector();
+        RowDataDeserializationSchema deserializationSchema = new 
RowDataDeserializationSchema(PHYSICAL_TYPE);
+        for(String record : records){
+            deserializationSchema.deserialize(Arrays.asList(record),collector);
+        }
+
+        List<String> expected =
+                Arrays.asList(
+                        "+I(flink)",
+                        "+I(doris)");
+
+        List<String> actual =
+                
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+    }
+
+    private static class SimpleCollector implements Collector<RowData> {
+        private List<RowData> list = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            list.add(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
new file mode 100644
index 0000000..3489399
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization.convert;
+
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+public class DorisRowConverterTest implements Serializable {
+
+    @Test
+    public void testConvert(){
+        ResolvedSchema SCHEMA =
+                ResolvedSchema.of(
+                        Column.physical("f1", DataTypes.NULL()),
+                        Column.physical("f2", DataTypes.BOOLEAN()),
+                        Column.physical("f3", DataTypes.FLOAT()),
+                        Column.physical("f4", DataTypes.DOUBLE()),
+                        Column.physical("f5", 
DataTypes.INTERVAL(DataTypes.YEAR())),
+                        Column.physical("f6", 
DataTypes.INTERVAL(DataTypes.DAY())),
+                        Column.physical("f7", DataTypes.TINYINT()),
+                        Column.physical("f8", DataTypes.SMALLINT()),
+                        Column.physical("f9", DataTypes.INT()),
+                        Column.physical("f10", DataTypes.BIGINT()),
+                        Column.physical("f11", DataTypes.DECIMAL(10,2)),
+                        Column.physical("f12", 
DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+                        Column.physical("f13", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+                        Column.physical("f14", DataTypes.DATE()),
+                        Column.physical("f15", DataTypes.CHAR(1)),
+                        Column.physical("f16", DataTypes.VARCHAR(256)));
+
+        DorisRowConverter converter = new DorisRowConverter((RowType) 
SCHEMA.toPhysicalRowDataType().getLogicalType());
+
+        List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128, 
BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01 
08:00:00","2021-01-01","a","doris");
+        GenericRowData rowData = converter.convert(record);
+        
Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01 
08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString());
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
index 5984d91..8e08071 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
@@ -19,7 +19,10 @@ package org.apache.doris.flink.sink;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.rest.PartitionDefinition;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Properties;
 
 /**
@@ -35,19 +38,20 @@ public class OptionUtils {
         DorisExecutionOptions.Builder builder = 
DorisExecutionOptions.builder();
         builder.setLabelPrefix("doris")
                 .setStreamLoadProp(properties)
-                .setBufferSize(8*1024)
+                .setBufferSize(8 * 1024)
                 .setBufferCount(3)
                 .setDeletable(true)
                 .setCheckInterval(100)
                 .setMaxRetries(2);
         return builder.build();
     }
+
     public static DorisExecutionOptions buildExecutionOptional(Properties 
properties) {
 
         DorisExecutionOptions.Builder builder = 
DorisExecutionOptions.builder();
         builder.setLabelPrefix("doris")
                 .setStreamLoadProp(properties)
-                .setBufferSize(8*1024)
+                .setBufferSize(8 * 1024)
                 .setBufferCount(3)
                 .setDeletable(true)
                 .setCheckInterval(100)
@@ -56,6 +60,10 @@ public class OptionUtils {
     }
 
     public static DorisReadOptions buildDorisReadOptions() {
+        return dorisReadOptionsBuilder().build();
+    }
+
+    public static DorisReadOptions.Builder dorisReadOptionsBuilder() {
         DorisReadOptions.Builder builder = DorisReadOptions.builder();
         builder.setDeserializeArrowAsync(false)
                 .setDeserializeQueueSize(64)
@@ -66,15 +74,20 @@ public class OptionUtils {
                 .setRequestReadTimeoutMs(10000)
                 .setRequestRetries(3)
                 .setRequestTabletSize(1024 * 1024);
-        return builder.build();
+        return builder;
     }
 
     public static DorisOptions buildDorisOptions() {
         DorisOptions.Builder builder = DorisOptions.builder();
-        builder.setFenodes("local:8040")
-                .setTableIdentifier("db_test.table_test")
-                .setUsername("u_test")
-                .setPassword("p_test");
+        builder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("db.table")
+                .setUsername("root")
+                .setPassword("");
         return builder.build();
     }
+
+    public static PartitionDefinition buildPartitionDef() {
+        HashSet<Long> tabletIds = new HashSet<>(Arrays.asList(100L));
+        return new PartitionDefinition("db", "table", "127.0.0.1:9060", 
tabletIds, "");
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
new file mode 100644
index 0000000..aa5eb59
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Example Tests for the {@link DorisSource}.
+ **/
+public class DorisSourceExampleTest {
+
+    @Test
+    public void testBoundedDorisSource() throws Exception {
+        DorisSource<List<?>> dorisSource = 
DorisSourceBuilder.<List<?>>builder()
+                .setDorisOptions(OptionUtils.buildDorisOptions())
+                .setDorisReadOptions(OptionUtils.buildDorisReadOptions())
+                .setDeserializer(new SimpleListDeserializationSchema())
+                .build();
+
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
Source")
+                .addSink(new PrintSinkFunction<>());
+        env.execute("Flink doris source test");
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
new file mode 100644
index 0000000..36cb4f9
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Unit tests for the {@link PendingSplitsCheckpointSerializer}.
+ */
+public class PendingSplitsCheckpointSerializerTest {
+
+    private static void assertCheckpointsEqual(
+            final PendingSplitsCheckpoint expected,
+            final PendingSplitsCheckpoint actual) {
+        Assert.assertEquals(expected.getSplits(), actual.getSplits());
+    }
+
+    @Test
+    public void serializeSplit() throws Exception {
+        final DorisSourceSplit split =
+                new DorisSourceSplit(OptionUtils.buildPartitionDef());
+        PendingSplitsCheckpoint checkpoint = new 
PendingSplitsCheckpoint(Arrays.asList(split));
+
+        final PendingSplitsCheckpointSerializer splitSerializer = new 
PendingSplitsCheckpointSerializer(DorisSourceSplitSerializer.INSTANCE);
+        byte[] serialized = splitSerializer.serialize(checkpoint);
+        PendingSplitsCheckpoint deserialize = 
splitSerializer.deserialize(splitSerializer.getVersion(), serialized);
+
+        assertCheckpointsEqual(checkpoint, deserialize);
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
new file mode 100644
index 0000000..a44b96d
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the {@link DorisSourceReader}.
+ */
+public class DorisSourceReaderTest {
+
+    private static DorisSourceReader createReader(TestingReaderContext 
context) {
+        return new DorisSourceReader<>(
+                OptionUtils.buildDorisOptions(),
+                OptionUtils.buildDorisReadOptions(),
+                new DorisRecordEmitter<>(new 
SimpleListDeserializationSchema()),
+                context,
+                context.getConfiguration()
+        );
+    }
+
+    private static DorisSourceSplit createTestDorisSplit() throws IOException {
+        return new DorisSourceSplit(OptionUtils.buildPartitionDef());
+    }
+
+    @Test
+    public void testRequestSplitWhenNoSplitRestored() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final DorisSourceReader reader = createReader(context);
+
+        reader.start();
+        reader.close();
+        assertEquals(1, context.getNumSplitRequests());
+    }
+
+    @Test
+    public void testNoSplitRequestWhenSplitRestored() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final DorisSourceReader reader = createReader(context);
+
+        reader.addSplits(Collections.singletonList(createTestDorisSplit()));
+        reader.start();
+        reader.close();
+
+        assertEquals(0, context.getNumSplitRequests());
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
new file mode 100644
index 0000000..253f2ea
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A testing implementation of the {@link SourceReaderContext}.
+ */
+public class TestingReaderContext implements SourceReaderContext {
+
+    private final SourceReaderMetricGroup metrics;
+
+    private final Configuration config;
+
+    private final ArrayList<SourceEvent> sentEvents = new ArrayList<>();
+
+    private int numSplitRequests;
+
+    public TestingReaderContext() {
+        this(new Configuration(), 
UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+    }
+
+    public TestingReaderContext(Configuration config, SourceReaderMetricGroup 
metricGroup) {
+        this.config = config;
+        this.metrics = metricGroup;
+    }
+
+    @Override
+    public SourceReaderMetricGroup metricGroup() {
+        return metrics;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return config;
+    }
+
+    @Override
+    public String getLocalHostName() {
+        return "localhost";
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return 0;
+    }
+
+    @Override
+    public void sendSplitRequest() {
+        numSplitRequests++;
+    }
+
+    @Override
+    public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+        sentEvents.add(sourceEvent);
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+    }
+
+    public int getNumSplitRequests() {
+        return numSplitRequests;
+    }
+
+    public List<SourceEvent> getSentEvents() {
+        return new ArrayList<>(sentEvents);
+    }
+
+    public void clearSentEvents() {
+        sentEvents.clear();
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
new file mode 100644
index 0000000..b116539
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.sink.OptionUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the {@link DorisSourceSplitSerializer}.
+ */
+public class DorisSourceSplitSerializerTest {
+
+    @Test
+    public void serializeSplit() throws Exception {
+        final DorisSourceSplit split =
+                new DorisSourceSplit(OptionUtils.buildPartitionDef());
+
+        DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split);
+        assertEquals(split, deSerialized);
+    }
+
+    private DorisSourceSplit serializeAndDeserializeSplit(DorisSourceSplit 
split) throws Exception {
+        final DorisSourceSplitSerializer splitSerializer = new 
DorisSourceSplitSerializer();
+        byte[] serialized = splitSerializer.serialize(split);
+        return splitSerializer.deserialize(splitSerializer.getVersion(), 
serialized);
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
similarity index 62%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
copy to 
flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
index d9ec6e5..4873cb2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
@@ -14,20 +14,25 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.deserialization;
+package org.apache.doris.flink.source.split;
 
+import org.junit.Test;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import java.util.Collections;
 
-import java.util.List;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Unit tests for the {@link DorisSplitRecords} class.
+ */
+public class DorisSplitRecordsTest {
 
-public class SimpleListDeserializationSchema implements 
DorisDeserializationSchema<List<?>> {
+    @Test
+    public void testEmptySplits() {
+        final String split = "empty";
+        final DorisSplitRecords records = 
DorisSplitRecords.finishedSplit(split);
 
-    @Override
-    public TypeInformation<List<?>> getProducedType() {
-        return TypeInformation.of(new TypeHint<List<?>>() {
-        });
+        assertEquals(Collections.singleton(split), records.finishedSplits());
     }
+
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
new file mode 100644
index 0000000..533d56b
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.utils.FactoryMocks;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class DorisDynamicTableSourceTest {
+
+    @Test
+    public void testDorisUseNewApi() {
+        DorisReadOptions.Builder builder = 
OptionUtils.dorisReadOptionsBuilder();
+        builder.setUseOldApi(false);
+        final DorisDynamicTableSource actualDorisSource = new 
DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(), 
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertDorisSource(provider);
+    }
+
+    @Test
+    public void testDorisUseNewApiDefault() {
+        final DorisDynamicTableSource actualDorisSource = new 
DorisDynamicTableSource(OptionUtils.buildDorisOptions(), 
OptionUtils.buildDorisReadOptions(), 
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertDorisSource(provider);
+    }
+
+    @Test
+    public void testDorisUseOldApi() {
+        DorisReadOptions.Builder builder = 
OptionUtils.dorisReadOptionsBuilder();
+        builder.setUseOldApi(true);
+        final DorisDynamicTableSource actualDorisSource = new 
DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(), 
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertDorisInputFormat(provider);
+    }
+
+
+    private void assertDorisInputFormat(ScanTableSource.ScanRuntimeProvider 
provider) {
+        assertThat(provider, instanceOf(InputFormatProvider.class));
+        final InputFormatProvider inputFormatProvider = (InputFormatProvider) 
provider;
+
+        InputFormat<RowData, DorisTableInputSplit> inputFormat = 
(InputFormat<RowData, DorisTableInputSplit>) 
inputFormatProvider.createInputFormat();
+        assertThat(inputFormat, instanceOf(DorisRowDataInputFormat.class));
+    }
+
+
+    private void assertDorisSource(ScanTableSource.ScanRuntimeProvider 
provider) {
+        assertThat(provider, instanceOf(SourceProvider.class));
+        final SourceProvider sourceProvider = (SourceProvider) provider;
+
+        Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint> source =
+                (Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint>) 
sourceProvider.createSource();
+        assertThat(source, instanceOf(DorisSource.class));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
new file mode 100644
index 0000000..28bbd44
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Utilities for testing instances usually created by {@link FactoryUtil}. */
+public final class FactoryMocks {
+
+    public static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("a", DataTypes.STRING()),
+                    Column.physical("b", DataTypes.INT()),
+                    Column.physical("c", DataTypes.BOOLEAN()));
+
+    public static final DataType PHYSICAL_DATA_TYPE = 
SCHEMA.toPhysicalRowDataType();
+
+    public static final RowType PHYSICAL_TYPE = (RowType) 
PHYSICAL_DATA_TYPE.getLogicalType();
+
+    public static final ObjectIdentifier IDENTIFIER =
+            ObjectIdentifier.of("default", "default", "t1");
+
+    private FactoryMocks() {
+        // no instantiation
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to