This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5150d77  [fix](connector) Fixed the issue where the loading task got 
stuck when stream load ended unexpectedly  (#305)
5150d77 is described below

commit 5150d7780dfcd0bfd5b113325267fa3bffcdd0be
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Thu Apr 10 15:36:19 2025 +0800

    [fix](connector) Fixed the issue where the loading task got stuck when 
stream load ended unexpectedly  (#305)
---
 .../client/write/AbstractStreamLoadProcessor.java  | 181 +++++++++++++--------
 .../apache/doris/spark/config/DorisOptions.java    |   4 +-
 .../spark/sql/DorisWriterFailoverITCase.scala      |  41 ++++-
 .../apache/doris/spark/write/DorisDataWriter.scala |   4 +-
 4 files changed, 151 insertions(+), 79 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 169f5a8..eec3f73 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,12 +17,6 @@
 
 package org.apache.doris.spark.client.write;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import org.apache.doris.spark.client.DorisBackendHttpClient;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.Backend;
@@ -36,6 +30,10 @@ import org.apache.doris.spark.util.EscapeHandler;
 import org.apache.doris.spark.util.HttpUtils;
 import org.apache.doris.spark.util.URLs;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -51,6 +49,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -66,56 +66,39 @@ import java.util.stream.Collectors;
 
 public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> 
implements DorisCommitter {
 
-    protected final Logger logger = 
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
-
-    protected static final JsonMapper MAPPER = 
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
 false).build();
-
+    protected static final JsonMapper MAPPER =
+            
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
 false).build();
     private static final String PARTIAL_COLUMNS = "partial_columns";
     private static final String GROUP_COMMIT = "group_commit";
-    private static final Set<String> VALID_GROUP_MODE = new 
HashSet<>(Arrays.asList("sync_mode", "async_mode", "off_mode"));
-
+    private static final Set<String> VALID_GROUP_MODE =
+            new HashSet<>(Arrays.asList("sync_mode", "async_mode", 
"off_mode"));
+    private static final int arrowBufferSize = 1000;
+    protected final Logger logger = 
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
     protected final DorisConfig config;
-
     private final DorisFrontendClient frontend;
     private final DorisBackendHttpClient backendHttpClient;
-
     private final String database;
     private final String table;
-
     private final boolean autoRedirect;
-
     private final boolean isHttpsEnabled;
-
     private final boolean isTwoPhaseCommitEnabled;
-
     private final Map<String, String> properties;
-
     private final DataFormat format;
-
+    private final boolean isGzipCompressionEnabled;
+    private final boolean isPassThrough;
+    private final List<R> recordBuffer = new LinkedList<>();
+    private final int pipeSize;
     protected String columnSeparator;
-
     private byte[] lineDelimiter;
-
-    private final boolean isGzipCompressionEnabled;
-
     private String groupCommit;
-
-    private final boolean isPassThrough;
-
     private PipedOutputStream output;
-
     private boolean createNewBatch = true;
-
     private boolean isFirstRecordOfBatch = true;
-
-    private final List<R> recordBuffer = new LinkedList<>();
-
-    private static final int arrowBufferSize = 1000;
-
     private transient ExecutorService executor;
 
-    private Future<CloseableHttpResponse> requestFuture = null;
+    private Future<StreamLoadResponse> requestFuture = null;
     private volatile String currentLabel;
+    private Exception unexpectedException = null;
 
     public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
         super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
@@ -132,22 +115,31 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         // init stream load props
         this.isTwoPhaseCommitEnabled = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
         this.format = DataFormat.valueOf(properties.getOrDefault("format", 
"csv").toUpperCase());
-        this.isGzipCompressionEnabled = 
properties.containsKey("compress_type") && 
"gzip".equals(properties.get("compress_type"));
+        this.isGzipCompressionEnabled =
+                properties.containsKey("compress_type") && 
"gzip".equals(properties.get("compress_type"));
         if (properties.containsKey(GROUP_COMMIT)) {
             String message = "";
-            if (isTwoPhaseCommitEnabled) message = "group commit does not 
support two-phase commit";
-            if (properties.containsKey(PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS)))
+            if (isTwoPhaseCommitEnabled) {
+                message = "group commit does not support two-phase commit";
+            }
+            if (properties.containsKey(PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) {
                 message = "group commit does not support partial column 
updates";
-            if 
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase()))
+            }
+            if 
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) {
                 message = "Unsupported group commit mode: " + 
properties.get(GROUP_COMMIT);
-            if (!message.isEmpty()) throw new 
IllegalArgumentException(message);
+            }
+            if (!message.isEmpty()) {
+                throw new IllegalArgumentException(message);
+            }
             groupCommit = properties.get(GROUP_COMMIT).toLowerCase();
         }
         this.isPassThrough = 
config.getValue(DorisOptions.DORIS_SINK_STREAMING_PASSTHROUGH);
+        this.pipeSize = 
config.getValue(DorisOptions.DORIS_SINK_NET_BUFFER_SIZE);
     }
 
     public void load(R row) throws Exception {
         if (createNewBatch) {
+            createNewBatch = false;
             if (autoRedirect) {
                 requestFuture = frontend.requestFrontends((frontEnd, 
httpClient) ->
                         buildReqAndExec(frontEnd.getHost(), 
frontEnd.getHttpPort(), httpClient));
@@ -155,14 +147,13 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 requestFuture = backendHttpClient.executeReq((backend, 
httpClient) ->
                         buildReqAndExec(backend.getHost(), 
backend.getHttpPort(), httpClient));
             }
-            createNewBatch = false;
         }
         if (isFirstRecordOfBatch) {
             isFirstRecordOfBatch = false;
-        } else if (lineDelimiter != null){
-            output.write(lineDelimiter);
+        } else if (lineDelimiter != null) {
+            writeTo(lineDelimiter);
         }
-        output.write(toFormat(row, format));
+        writeTo(toFormat(row, format));
         currentBatchCount++;
     }
 
@@ -175,23 +166,24 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
             if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) {
                 List<R> rs = new LinkedList<>(recordBuffer);
                 recordBuffer.clear();
-                output.write(toArrowFormat(rs));
+                writeTo(toArrowFormat(rs));
             }
             output.close();
             logger.info("stream load stopped with {}", currentLabel != null ? 
currentLabel : "group commit");
-            CloseableHttpResponse res = requestFuture.get();
-            if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-                throw new StreamLoadException("stream load execute failed, 
status: " + res.getStatusLine().getStatusCode()
-                        + ", msg: " + res.getStatusLine().getReasonPhrase());
-            }
-            String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(res.getEntity()));
-            logger.info("stream load response: {}", resEntity);
-            StreamLoadResponse response = MAPPER.readValue(resEntity, 
StreamLoadResponse.class);
-            if (response != null && response.isSuccess()) {
-                return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
-            } else {
-                throw new StreamLoadException("stream load execute failed, 
response: " + resEntity);
+
+            StreamLoadResponse response;
+            try {
+                response = requestFuture.get();
+                if (response == null) {
+                    throw new StreamLoadException("response is null");
+                }
+            } catch (Exception e) {
+                if (unexpectedException != null) {
+                    throw unexpectedException;
+                }
+                throw new StreamLoadException("stream load stop failed", e);
             }
+            return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
         }
         return null;
     }
@@ -211,8 +203,9 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                         .getReasonPhrase());
             } else {
                 String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
-                if(!checkTransResponse(resEntity)) {
-                    throw new RuntimeException("commit transaction failed, 
transaction: " + msg + ", resp: " + resEntity);
+                if (!checkTransResponse(resEntity)) {
+                    throw new RuntimeException(
+                            "commit transaction failed, transaction: " + msg + 
", resp: " + resEntity);
                 } else {
                     this.logger.info("commit: {} response: {}", msg, 
resEntity);
                 }
@@ -256,8 +249,9 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                         .getReasonPhrase());
             } else {
                 String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
-                if(!checkTransResponse(resEntity)) {
-                    throw new RuntimeException("abort transaction failed, 
transaction: " + msg + ", resp: " + resEntity);
+                if (!checkTransResponse(resEntity)) {
+                    throw new RuntimeException(
+                            "abort transaction failed, transaction: " + msg + 
", resp: " + resEntity);
                 } else {
                     this.logger.info("abort: {} response: {}", msg, resEntity);
                 }
@@ -271,11 +265,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         ObjectMapper objectMapper = new ObjectMapper();
         try {
             String status = 
objectMapper.readTree(resEntity).get("status").asText();
-                if ("Success".equalsIgnoreCase(status)) {
-                    return true;
-                }
+            if ("Success".equalsIgnoreCase(status)) {
+                return true;
+            }
         } catch (JsonProcessingException e) {
-            logger.warn("invalid json response: " +  resEntity, e);
+            logger.warn("invalid json response: " + resEntity, e);
         }
         return false;
     }
@@ -341,7 +335,9 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         if (config.contains(DorisOptions.DORIS_MAX_FILTER_RATIO)) {
             httpPut.setHeader("max_filter_ratio", 
config.getValue(DorisOptions.DORIS_MAX_FILTER_RATIO));
         }
-        if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit", 
"true");
+        if (isTwoPhaseCommitEnabled) {
+            httpPut.setHeader("two_phase_commit", "true");
+        }
 
         switch (format) {
             case CSV:
@@ -352,7 +348,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 break;
             case JSON:
                 lineDelimiter = properties.getOrDefault("line_delimiter", 
"\n").getBytes(
-                    StandardCharsets.UTF_8);
+                        StandardCharsets.UTF_8);
                 properties.put("read_json_by_line", "true");
                 break;
             case ARROW:
@@ -384,14 +380,14 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
 
     protected abstract String generateStreamLoadLabel() throws 
OptionRequiredException;
 
-    private Future<CloseableHttpResponse> buildReqAndExec(String host, Integer 
port, CloseableHttpClient client) {
+    private Future<StreamLoadResponse> buildReqAndExec(String host, Integer 
port, CloseableHttpClient client) {
         HttpPut httpPut = new HttpPut(URLs.streamLoad(host, port, database, 
table, isHttpsEnabled));
         try {
             handleStreamLoadProperties(httpPut);
         } catch (OptionRequiredException e) {
             throw new RuntimeException("stream load handle properties failed", 
e);
         }
-        PipedInputStream pipedInputStream = new PipedInputStream(4096);
+        PipedInputStream pipedInputStream = new PipedInputStream(pipeSize);
         try {
             output = new PipedOutputStream(pipedInputStream);
         } catch (IOException e) {
@@ -402,15 +398,44 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
             entity = new GzipCompressingEntity(entity);
         }
         httpPut.setEntity(entity);
-
-        logger.info("table {}.{} stream load started for {} on host {}:{}", 
database, table, currentLabel != null ? currentLabel : "group commit", host, 
port);
-        return getExecutors().submit(() -> client.execute(httpPut));
+        Thread currentThread = Thread.currentThread();
+
+        logger.info("table {}.{} stream load started for {} on host {}:{}", 
database, table,
+                currentLabel != null ? currentLabel : "group commit", host, 
port);
+        return getExecutors().submit(() -> {
+            StreamLoadResponse streamLoadResponse = null;
+            try (CloseableHttpResponse response = client.execute(httpPut)) {
+                // stream load http request finished unexpectedly
+                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
+                    throw new StreamLoadException(
+                            "stream load failed, status: " + 
response.getStatusLine().getStatusCode()
+                                    + ", reason: " + 
response.getStatusLine().getReasonPhrase());
+                }
+                String entityStr = EntityUtils.toString(response.getEntity());
+                streamLoadResponse = MAPPER.readValue(entityStr, 
StreamLoadResponse.class);
+                logger.info("stream load response: " + entityStr);
+                if (streamLoadResponse == null) {
+                    throw new StreamLoadException("stream load failed, 
response is null, response: " + entityStr);
+                } else if (!streamLoadResponse.isSuccess()) {
+                    throw new StreamLoadException(
+                            "stream load failed, txnId: " + 
streamLoadResponse.getTxnId()
+                                    + ", status: " + 
streamLoadResponse.getStatus()
+                                    + ", msg: " + 
streamLoadResponse.getMessage());
+                }
+            } catch (Exception e) {
+                logger.error("stream load exception", e);
+                unexpectedException = e;
+                currentThread.interrupt();
+            }
+            return streamLoadResponse;
+        });
     }
 
     @Override
     public void close() throws IOException {
         createNewBatch = true;
         isFirstRecordOfBatch = true;
+        unexpectedException = null;
         frontend.close();
         if (backendHttpClient != null) {
             backendHttpClient.close();
@@ -447,4 +472,16 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         return executor;
     }
 
+    private void writeTo(byte[] bytes) throws Exception {
+        try {
+            output.write(bytes);
+        } catch (Exception e) {
+            if (unexpectedException != null) {
+                throw unexpectedException;
+            } else {
+                throw e;
+            }
+        }
+    }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index a8d29ae..01dad06 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -65,7 +65,7 @@ public class DorisOptions {
 
     public static final ConfigOption<String> DORIS_WRITE_FIELDS = 
ConfigOptions.name("doris.write.fields").stringType().withoutDefaultValue().withDescription("");
 
-    public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = 
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription("");
+    public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = 
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(500000).withDescription("");
 
     public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES = 
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
     public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS = 
ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The
 interval at which the Spark connector tries to load the batch of data again 
after load fails.");
@@ -130,5 +130,7 @@ public class DorisOptions {
 
     public static final ConfigOption<Boolean> DORIS_READ_BITMAP_TO_BASE64 = 
ConfigOptions.name("doris.read.bitmap-to-base64").booleanType().defaultValue(false).withDescription("");
 
+    public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = 
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 
1024).withDescription("");
+
 
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
index dd73f53..9c38eed 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -17,18 +17,21 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
 import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
 import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
-import org.junit.{Before, Test}
+import org.hamcrest.{CoreMatchers, Description, Matcher}
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
 import org.slf4j.LoggerFactory
 
 import java.util
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
-import scala.util.control.Breaks._
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
 
 /**
  * Test DorisWriter failover.
@@ -41,6 +44,12 @@ class DorisWriterFailoverITCase extends 
AbstractContainerTestBase {
   val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry"
   val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail"
   val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail"
+  val TABLE_WRITE_TBL_FAIL_BEFORE_STOP = "tbl_write_tbl_fail_before_stop"
+
+  val _thrown: ExpectedException = ExpectedException.none
+
+  @Rule
+  def thrown: ExpectedException = _thrown
 
   @Before
   def setUp(): Unit = {
@@ -223,4 +232,30 @@ class DorisWriterFailoverITCase extends 
AbstractContainerTestBase {
     LOG.info("Checking DorisWriterFailoverITCase result. testName={}, 
actual={}, expected={}", testName, actual, expected)
     assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
   }
+
+  @Test
+  def testForWriteExceptionBeforeStop(): Unit = {
+    initializeTable(TABLE_WRITE_TBL_FAIL_BEFORE_STOP, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[1]").getOrCreate()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris", "cn"),
+        ("spark", "us"),
+        ("catalog", "uk")
+      )).toDF("name", "address")
+      thrown.expect(classOf[SparkException])
+      df.write.format("doris")
+        .option("table.identifier", DATABASE + "." + 
TABLE_WRITE_TBL_FAIL_BEFORE_STOP)
+        .option("fenodes", getFenodes)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("doris.sink.properties.partial_columns", "true")
+        .option("doris.sink.net.buffer.size", "1")
+        .mode("append")
+        .save()
+    } finally {
+      session.stop()
+    }
+  }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index b6edf3c..bc92a93 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -28,12 +28,10 @@ import org.apache.spark.sql.types.StructType
 import java.time.Duration
 import java.util.concurrent.locks.LockSupport
 import scala.collection.mutable
-import scala.util.{Failure, Random, Success}
+import scala.util.{Failure, Success}
 
 class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: 
Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with 
Logging {
 
-  private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
-
   private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) =
     config.getValue(DorisOptions.LOAD_MODE) match {
       case "stream_load" => (new StreamLoadProcessor(config, schema), new 
StreamLoadProcessor(config, schema))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to