This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b73ca39  [fix] fix the enable.batch-mode option not working (#372)
b73ca39 is described below

commit b73ca3925a7a781f620b8cac9cf3c0e958f32090
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Sat Apr 27 13:08:13 2024 +0800

    [fix] fix the enable.batch-mode option not working (#372)
    
    * fix the enable.batch-mode option not working
---
 .../main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java    | 3 +++
 .../src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java   | 2 +-
 .../main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java    | 2 +-
 .../src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java    | 2 +-
 4 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index c5a9f66..779a296 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -293,6 +293,9 @@ public class DorisExecutionOptions implements Serializable {
 
         public Builder setBatchMode(Boolean enableBatchMode) {
             this.enableBatchMode = enableBatchMode;
+            if (enableBatchMode.equals(Boolean.TRUE)) {
+                this.writeMode = WriteMode.STREAM_LOAD_BATCH;
+            }
             return this;
         }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
index 1789fdf..89e9182 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
@@ -24,7 +24,7 @@ import java.util.Properties;
 public class DorisStreamOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private Properties prop;
+    private final Properties prop;
     private DorisOptions options;
     private DorisReadOptions readOptions;
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
index 686b8ee..1b3a089 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
@@ -82,7 +82,7 @@ public class DorisCopyWriter<IN>
                         .getRestoredCheckpointId()
                         .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
         LOG.info("restore checkpointId {}", lastCheckpointId);
-        LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+        LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
         this.labelPrefix =
                 executionOptions.getLabelPrefix()
                         + "_"
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index 46c3185..47add12 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -73,8 +73,8 @@ public class DorisSinkBatchExample {
                 .setBufferFlushMaxBytes(8 * 1024)
                 .setBufferFlushMaxRows(900)
                 .setBufferFlushIntervalMs(1000 * 10)
+                // .setBatchMode(true);
                 .setWriteMode(WriteMode.STREAM_LOAD_BATCH);
-
         builder.setDorisReadOptions(readOptionBuilder.build())
                 .setDorisExecutionOptions(executionBuilder.build())
                 .setSerializer(new SimpleStringSerializer())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to