This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new efe53f30 [Improve] add partial limit push down (#553)
efe53f30 is described below

commit efe53f304274dc4a6cea3c52b6a18e1b3e48bce4
Author: wudi <676366...@qq.com>
AuthorDate: Wed Feb 12 10:32:54 2025 +0800

    [Improve] add partial limit push down (#553)
---
 .../apache/doris/flink/cfg/DorisReadOptions.java   | 27 ++++++++++++++++++----
 .../source/reader/DorisFlightValueReader.java      |  5 ++++
 .../flink/source/reader/DorisValueReader.java      |  6 +++++
 .../doris/flink/table/DorisDynamicTableSource.java | 10 +++++++-
 4 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 22a77b83..1889ace5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -39,6 +39,8 @@ public class DorisReadOptions implements Serializable {
     private boolean useOldApi;
     private boolean useFlightSql;
     private Integer flightSqlPort;
+    // for flink sql limit push down
+    private Long rowLimit;
 
     public DorisReadOptions(
             String readFields,
@@ -54,7 +56,8 @@ public class DorisReadOptions implements Serializable {
             Boolean deserializeArrowAsync,
             boolean useOldApi,
             boolean useFlightSql,
-            Integer flightSqlPort) {
+            Integer flightSqlPort,
+            Long rowLimit) {
         this.readFields = readFields;
         this.filterQuery = filterQuery;
         this.requestTabletSize = requestTabletSize;
@@ -69,6 +72,7 @@ public class DorisReadOptions implements Serializable {
         this.useOldApi = useOldApi;
         this.useFlightSql = useFlightSql;
         this.flightSqlPort = flightSqlPort;
+        this.rowLimit = rowLimit;
     }
 
     public String getReadFields() {
@@ -135,6 +139,14 @@ public class DorisReadOptions implements Serializable {
         return flightSqlPort;
     }
 
+    public Long getRowLimit() {
+        return rowLimit;
+    }
+
+    public void setRowLimit(Long rowLimit) {
+        this.rowLimit = rowLimit;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -165,7 +177,8 @@ public class DorisReadOptions implements Serializable {
                 && Objects.equals(deserializeQueueSize, 
that.deserializeQueueSize)
                 && Objects.equals(deserializeArrowAsync, 
that.deserializeArrowAsync)
                 && Objects.equals(useFlightSql, that.useFlightSql)
-                && Objects.equals(flightSqlPort, that.flightSqlPort);
+                && Objects.equals(flightSqlPort, that.flightSqlPort)
+                && Objects.equals(rowLimit, that.rowLimit);
     }
 
     @Override
@@ -184,7 +197,8 @@ public class DorisReadOptions implements Serializable {
                 deserializeArrowAsync,
                 useOldApi,
                 useFlightSql,
-                flightSqlPort);
+                flightSqlPort,
+                rowLimit);
     }
 
     public DorisReadOptions copy() {
@@ -202,7 +216,8 @@ public class DorisReadOptions implements Serializable {
                 deserializeArrowAsync,
                 useOldApi,
                 useFlightSql,
-                flightSqlPort);
+                flightSqlPort,
+                rowLimit);
     }
 
     /** Builder of {@link DorisReadOptions}. */
@@ -227,6 +242,7 @@ public class DorisReadOptions implements Serializable {
         private Boolean useOldApi = false;
         private Boolean useFlightSql = 
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
         private Integer flightSqlPort;
+        private Long rowLimit;
 
         /**
          * Sets the readFields for doris table to push down projection, such 
as name,age.
@@ -406,7 +422,8 @@ public class DorisReadOptions implements Serializable {
                     deserializeArrowAsync,
                     useOldApi,
                     useFlightSql,
-                    flightSqlPort);
+                    flightSqlPort,
+                    rowLimit);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
index fbf05050..0234be0f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
@@ -132,6 +132,11 @@ public class DorisFlightValueReader extends ValueReader 
implements AutoCloseable
         if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
             sql += " WHERE " + readOptions.getFilterQuery();
         }
+
+        if (readOptions.getRowLimit() != null) {
+            sql += " LIMIT " + readOptions.getRowLimit();
+        }
+
         logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
         return sql;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 2db4f798..e55bb7ca 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -74,6 +74,7 @@ public class DorisValueReader extends ValueReader implements 
AutoCloseable {
     protected String contextId;
     protected Schema schema;
     protected boolean asyncThreadStarted;
+    private long readRowCount = 0L;
 
     public DorisValueReader(
             PartitionDefinition partition, DorisOptions options, 
DorisReadOptions readOptions) {
@@ -210,6 +211,10 @@ public class DorisValueReader extends ValueReader 
implements AutoCloseable {
      * @return true if hax next value
      */
     public boolean hasNext() {
+        if (readOptions.getRowLimit() != null && readRowCount >= 
readOptions.getRowLimit()) {
+            return false;
+        }
+
         boolean hasNext = false;
         if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
             // support deserialize Arrow to RowBatch asynchronously
@@ -275,6 +280,7 @@ public class DorisValueReader extends ValueReader 
implements AutoCloseable {
             LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
             throw new ShouldNeverHappenException();
         }
+        readRowCount++;
         return rowBatch.next();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index a68cf189..e4f27d8b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -58,7 +59,8 @@ public final class DorisDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsFilterPushDown,
-                SupportsProjectionPushDown {
+                SupportsProjectionPushDown,
+                SupportsLimitPushDown {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicTableSource.class);
     private final DorisOptions options;
@@ -256,4 +258,10 @@ public final class DorisDynamicTableSource
                 resolvedFilterQuery,
                 physicalRowDataType);
     }
+
+    @Override
+    public void applyLimit(long limit) {
+        // partial limit push down to reduce the amount of data scanned
+        readOptions.setRowLimit(limit);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to