This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 66ce0157 [Chore] improve dep and add builder comment (#496)
66ce0157 is described below

commit 66ce0157ec56be4e35746701ba2e65763822bac6
Author: wudi <676366...@qq.com>
AuthorDate: Fri Oct 11 14:16:09 2024 +0800

    [Chore] improve dep and add builder comment (#496)
---
 flink-doris-connector/pom.xml                      |  10 +-
 .../doris/flink/cfg/DorisExecutionOptions.java     | 115 +++++++++++++++++++++
 .../org/apache/doris/flink/cfg/DorisOptions.java   |  54 ++++++++--
 .../apache/doris/flink/cfg/DorisReadOptions.java   | 101 +++++++++++++++++-
 .../org/apache/doris/flink/sink/DorisSink.java     |  30 ++++++
 .../org/apache/doris/flink/source/DorisSource.java |  24 +++++
 6 files changed, 315 insertions(+), 19 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index d773339b..775242b0 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -105,14 +105,6 @@ under the License.
             <artifactId>thrift-service</artifactId>
             <version>${thrift-service.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner-loader</artifactId>
@@ -369,7 +361,7 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
+            <scope>test</scope>
         </dependency>
         <!--Test-->
         <dependency>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 7ad8ba97..831a317e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -292,51 +292,112 @@ public class DorisExecutionOptions implements 
Serializable {
         private WriteMode writeMode = WriteMode.STREAM_LOAD;
         private boolean ignoreCommitError = false;
 
+        /**
+         * Sets the checkInterval to check exception with the interval while 
loading, The default is
+         * 0, disabling the checker thread.
+         *
+         * @param checkInterval
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
             return this;
         }
 
+        /**
+         * Sets the maxRetries to load data. In batch mode, this parameter is 
the number of stream
+         * load retries, In non-batch mode, this parameter is the number of 
retries in the commit
+         * phase.
+         *
+         * @param maxRetries
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setMaxRetries(Integer maxRetries) {
             this.maxRetries = maxRetries;
             return this;
         }
 
+        /**
+         * Sets the buffer size to cache data for stream load. Only valid in 
non-batch mode.
+         *
+         * @param bufferSize
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBufferSize(int bufferSize) {
             this.bufferSize = bufferSize;
             return this;
         }
 
+        /**
+         * Sets the buffer count to cache data for stream load. Only valid in 
non-batch mode.
+         *
+         * @param bufferCount
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBufferCount(int bufferCount) {
             this.bufferCount = bufferCount;
             return this;
         }
 
+        /**
+         * Sets the unique label prefix for stream load.
+         *
+         * @param labelPrefix
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setLabelPrefix(String labelPrefix) {
             this.labelPrefix = labelPrefix;
             return this;
         }
 
+        /**
+         * Sets whether to use cache for stream load. Only valid in non-batch 
mode.
+         *
+         * @param useCache
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setUseCache(boolean useCache) {
             this.useCache = useCache;
             return this;
         }
 
+        /**
+         * Sets the properties for stream load.
+         *
+         * @param streamLoadProp
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setStreamLoadProp(Properties streamLoadProp) {
             this.streamLoadProp = streamLoadProp;
             return this;
         }
 
+        /**
+         * Sets whether to perform the deletion operation for stream load.
+         *
+         * @param enableDelete
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setDeletable(Boolean enableDelete) {
             this.enableDelete = enableDelete;
             return this;
         }
 
+        /**
+         * Sets whether to disable 2pc(two-phase commit) for stream load.
+         *
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder disable2PC() {
             this.enable2PC = false;
             return this;
         }
 
+        /**
+         * Sets whether to force 2pc on. The default uniq model will turn off 
2pc.
+         *
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder enable2PC() {
             this.enable2PC = true;
             // Force open 2pc
@@ -344,6 +405,12 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        /**
+         * Set whether to use batch mode to stream load.
+         *
+         * @param enableBatchMode
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBatchMode(Boolean enableBatchMode) {
             this.enableBatchMode = enableBatchMode;
             if (enableBatchMode.equals(Boolean.TRUE)) {
@@ -352,41 +419,89 @@ public class DorisExecutionOptions implements 
Serializable {
             return this;
         }
 
+        /**
+         * Set queue size in batch mode.
+         *
+         * @param flushQueueSize
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setFlushQueueSize(int flushQueueSize) {
             this.flushQueueSize = flushQueueSize;
             return this;
         }
 
+        /**
+         * Set the flush interval mills for stream load in batch mode.
+         *
+         * @param bufferFlushIntervalMs
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
             this.bufferFlushIntervalMs = bufferFlushIntervalMs;
             return this;
         }
 
+        /**
+         * Set the max flush rows for stream load in batch mode.
+         *
+         * @param bufferFlushMaxRows
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
             this.bufferFlushMaxRows = bufferFlushMaxRows;
             return this;
         }
 
+        /**
+         * Set the max flush bytes for stream load in batch mode.
+         *
+         * @param bufferFlushMaxBytes
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
             this.bufferFlushMaxBytes = bufferFlushMaxBytes;
             return this;
         }
 
+        /**
+         * Set Whether to ignore the ignore updateBefore event.
+         *
+         * @param ignoreUpdateBefore
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
             this.ignoreUpdateBefore = ignoreUpdateBefore;
             return this;
         }
 
+        /**
+         * Set the writing mode, only supports STREAM_LOAD and 
STREAM_LOAD_BATCH
+         *
+         * @param writeMode
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setWriteMode(WriteMode writeMode) {
             this.writeMode = writeMode;
             return this;
         }
 
+        /**
+         * Set whether to ignore commit failure errors. This is only valid in 
non-batch mode 2pc.
+         * When ignored, data loss may occur.
+         *
+         * @param ignoreCommitError
+         * @return this DorisExecutionOptions.builder.
+         */
         public Builder setIgnoreCommitError(boolean ignoreCommitError) {
             this.ignoreCommitError = ignoreCommitError;
             return this;
         }
 
+        /**
+         * Build the {@link DorisExecutionOptions}.
+         *
+         * @return a DorisExecutionOptions with the settings made for this 
builder.
+         */
         public DorisExecutionOptions build() {
             // If format=json is set but read_json_by_line is not set, record 
may not be written.
             if (streamLoadProp != null
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index bf6c7a28..69273c9e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -101,47 +101,89 @@ public class DorisOptions extends DorisConnectionOptions {
         private boolean autoRedirect = true;
         private String tableIdentifier;
 
-        /** required, tableIdentifier. */
+        /**
+         * Sets the tableIdentifier for the DorisOptions.
+         *
+         * @param tableIdentifier Doris's database name and table name, such 
as db.tbl
+         * @return this DorisOptions.builder.
+         */
         public Builder setTableIdentifier(String tableIdentifier) {
             this.tableIdentifier = tableIdentifier;
             return this;
         }
 
-        /** optional, user name. */
+        /**
+         * Sets the username of doris cluster.
+         *
+         * @param username Doris cluster username
+         * @return this DorisOptions.builder.
+         */
         public Builder setUsername(String username) {
             this.username = username;
             return this;
         }
 
-        /** optional, password. */
+        /**
+         * Sets the password of doris cluster.
+         *
+         * @param password Doris cluster password
+         * @return this DorisOptions.builder.
+         */
         public Builder setPassword(String password) {
             this.password = password;
             return this;
         }
 
-        /** required, Frontend Http Rest url. */
+        /**
+         * Sets the doris frontend http rest url, such as 
127.0.0.1:8030,127.0.0.2:8030
+         *
+         * @param fenodes
+         * @return this DorisOptions.builder.
+         */
         public Builder setFenodes(String fenodes) {
             this.fenodes = fenodes;
             return this;
         }
 
-        /** optional, Backend Http Port. */
+        /**
+         * Sets the doris backend http rest url, such as 
127.0.0.1:8040,127.0.0.2:8040
+         *
+         * @param benodes
+         * @return this DorisOptions.builder.
+         */
         public Builder setBenodes(String benodes) {
             this.benodes = benodes;
             return this;
         }
 
-        /** not required, fe jdbc url, for lookup query. */
+        /**
+         * Sets the doris fe jdbc url for lookup query, such as 
jdbc:mysql://127.0.0.1:9030
+         *
+         * @param jdbcUrl
+         * @return this DorisOptions.builder.
+         */
         public Builder setJdbcUrl(String jdbcUrl) {
             this.jdbcUrl = jdbcUrl;
             return this;
         }
 
+        /**
+         * Sets the autoRedirect for DorisOptions. If true, stream load will 
be written directly to
+         * fe. If false, it will first get the be list and write directly to 
be.
+         *
+         * @param autoRedirect
+         * @return this DorisOptions.builder.
+         */
         public Builder setAutoRedirect(boolean autoRedirect) {
             this.autoRedirect = autoRedirect;
             return this;
         }
 
+        /**
+         * Build the {@link DorisOptions}.
+         *
+         * @return a DorisOptions with the settings made for this builder.
+         */
         public DorisOptions build() {
             checkNotNull(fenodes, "No fenodes supplied.");
             // multi table load, don't need check
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 937d3286..0448d60a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -223,76 +223,169 @@ public class DorisReadOptions implements Serializable {
         private Boolean useFlightSql = false;
         private Integer flightSqlPort;
 
+        /**
+         * Sets the readFields for doris table to push down projection, such 
as name,age.
+         *
+         * @param readFields
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setReadFields(String readFields) {
             this.readFields = readFields;
             return this;
         }
 
+        /**
+         * Sets the filterQuery for doris table to push down filter, such as 
name,age.
+         *
+         * @param filterQuery
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setFilterQuery(String filterQuery) {
             this.filterQuery = filterQuery;
             return this;
         }
 
+        /**
+         * Sets the requestTabletSize for DorisReadOptions. The number of 
Doris Tablets
+         * corresponding to a Partition, the smaller this value is set, the 
more Partitions will be
+         * generated. This improves the parallelism on the Flink side, but at 
the same time puts
+         * more pressure on Doris.
+         *
+         * @param requestTabletSize
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestTabletSize(Integer requestTabletSize) {
             this.requestTabletSize = requestTabletSize;
             return this;
         }
 
+        /**
+         * Sets the request connect timeout for DorisReadOptions.
+         *
+         * @param requestConnectTimeoutMs
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestConnectTimeoutMs(Integer 
requestConnectTimeoutMs) {
             this.requestConnectTimeoutMs = requestConnectTimeoutMs;
             return this;
         }
 
+        /**
+         * Sets the request read timeout for DorisReadOptions.
+         *
+         * @param requestReadTimeoutMs
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) {
             this.requestReadTimeoutMs = requestReadTimeoutMs;
             return this;
         }
 
+        /**
+         * Sets the timeout time for querying Doris for DorisReadOptions.
+         *
+         * @param requesQueryTimeoutS
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) {
             this.requestQueryTimeoutS = requesQueryTimeoutS;
             return this;
         }
 
+        /**
+         * Sets the number of retries to send requests to Doris for 
DorisReadOptions.
+         *
+         * @param requestRetries
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestRetries(Integer requestRetries) {
             this.requestRetries = requestRetries;
             return this;
         }
 
+        /**
+         * Sets the read batch size for DorisReadOptions.
+         *
+         * @param requestBatchSize
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setRequestBatchSize(Integer requestBatchSize) {
             this.requestBatchSize = requestBatchSize;
             return this;
         }
 
+        /**
+         * Sets the Memory limit for a single query for DorisReadOptions.
+         *
+         * @param execMemLimit
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setExecMemLimit(Long execMemLimit) {
             this.execMemLimit = execMemLimit;
             return this;
         }
 
+        /**
+         * Sets the Asynchronous conversion of internal processing queue in 
Arrow format
+         *
+         * @param deserializeQueueSize
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setDeserializeQueueSize(Integer deserializeQueueSize) {
             this.deserializeQueueSize = deserializeQueueSize;
             return this;
         }
 
+        /**
+         * Sets Whether to support asynchronous conversion of Arrow format to 
RowBatch needed for
+         * connector iterations.
+         *
+         * @param deserializeArrowAsync
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) 
{
             this.deserializeArrowAsync = deserializeArrowAsync;
             return this;
         }
 
-        public Builder setUseFlightSql(Boolean useFlightSql) {
-            this.useFlightSql = useFlightSql;
+        /**
+         * Whether to use the legacy source api
+         *
+         * @param useOldApi
+         * @return this DorisReadOptions.builder.
+         */
+        public Builder setUseOldApi(Boolean useOldApi) {
+            this.useOldApi = useOldApi;
             return this;
         }
 
-        public Builder setUseOldApi(Boolean useOldApi) {
-            this.useOldApi = useOldApi;
+        /**
+         * Whether to use arrow flight sql for query, only supports Doris2.1 
and above
+         *
+         * @param useFlightSql
+         * @return this DorisReadOptions.builder.
+         */
+        public Builder setUseFlightSql(Boolean useFlightSql) {
+            this.useFlightSql = useFlightSql;
             return this;
         }
 
+        /**
+         * Sets the flight sql port for DorisReadOptions.
+         *
+         * @param flightSqlPort
+         * @return this DorisReadOptions.builder.
+         */
         public Builder setFlightSqlPort(Integer flightSqlPort) {
             this.flightSqlPort = flightSqlPort;
             return this;
         }
 
+        /**
+         * Build the {@link DorisReadOptions}.
+         *
+         * @return a DorisReadOptions with the settings made for this builder.
+         */
         public DorisReadOptions build() {
             return new DorisReadOptions(
                     readFields,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index fd61d7fd..d8e0d827 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -161,26 +161,56 @@ public class DorisSink<IN>
         private DorisExecutionOptions dorisExecutionOptions;
         private DorisRecordSerializer<IN> serializer;
 
+        /**
+         * Sets the DorisOptions for the DorisSink.
+         *
+         * @param dorisOptions the common options of the doris cluster.
+         * @return this DorisSink.Builder.
+         */
         public Builder<IN> setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
             return this;
         }
 
+        /**
+         * Sets the DorisReadOptions for the DorisSink.
+         *
+         * @param dorisReadOptions the read options of the DorisSink.
+         * @return this DorisSink.Builder.
+         */
         public Builder<IN> setDorisReadOptions(DorisReadOptions 
dorisReadOptions) {
             this.dorisReadOptions = dorisReadOptions;
             return this;
         }
 
+        /**
+         * Sets the DorisExecutionOptions for the DorisSink.
+         *
+         * @param dorisExecutionOptions the execution options of the DorisSink.
+         * @return this DorisSink.Builder.
+         */
         public Builder<IN> setDorisExecutionOptions(DorisExecutionOptions 
dorisExecutionOptions) {
             this.dorisExecutionOptions = dorisExecutionOptions;
             return this;
         }
 
+        /**
+         * Sets the {@link DorisRecordSerializer serializer} that transforms 
incoming records to
+         * DorisRecord
+         *
+         * @param serializer
+         * @return this DorisSink.Builder.
+         */
         public Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) 
{
             this.serializer = serializer;
             return this;
         }
 
+        /**
+         * Build the {@link DorisSink}.
+         *
+         * @return a DorisSink with the settings made for this builder.
+         */
         public DorisSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 1b05453a..19a7fe36 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -168,21 +168,40 @@ public class DorisSource<OUT>
             boundedness = Boundedness.BOUNDED;
         }
 
+        /**
+         * Sets the DorisOptions for the DorisSource.
+         *
+         * @param options the common options of the doris cluster.
+         * @return this DorisSourceBuilder.
+         */
         public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
             this.options = options;
             return this;
         }
 
+        /**
+         * Sets the DorisReadOptions for the DorisSource.
+         *
+         * @param readOptions the read options of the DorisSource.
+         * @return this DorisSourceBuilder.
+         */
         public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions 
readOptions) {
             this.readOptions = readOptions;
             return this;
         }
 
+        /** Sets the Boundedness for the DorisSource, Currently only BOUNDED 
is supported. */
         public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) 
{
             this.boundedness = boundedness;
             return this;
         }
 
+        /**
+         * Sets the {@link DorisDeserializationSchema deserializer} of the 
Record for DorisSource.
+         *
+         * @param deserializer the deserializer for Doris Record.
+         * @return this DorisSourceBuilder.
+         */
         public DorisSourceBuilder<OUT> setDeserializer(
                 DorisDeserializationSchema<OUT> deserializer) {
             this.deserializer = deserializer;
@@ -194,6 +213,11 @@ public class DorisSource<OUT>
             return this;
         }
 
+        /**
+         * Build the {@link DorisSource}.
+         *
+         * @return a DorisSource with the settings made for this builder.
+         */
         public DorisSource<OUT> build() {
             if (readOptions == null) {
                 readOptions = DorisReadOptions.builder().build();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to