This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5150d77 [fix](connector) Fixed the issue where the loading task got stuck when stream load ended unexpectedly (#305) 5150d77 is described below commit 5150d7780dfcd0bfd5b113325267fa3bffcdd0be Author: gnehil <adamlee...@gmail.com> AuthorDate: Thu Apr 10 15:36:19 2025 +0800 [fix](connector) Fixed the issue where the loading task got stuck when stream load ended unexpectedly (#305) --- .../client/write/AbstractStreamLoadProcessor.java | 181 +++++++++++++-------- .../apache/doris/spark/config/DorisOptions.java | 4 +- .../spark/sql/DorisWriterFailoverITCase.scala | 41 ++++- .../apache/doris/spark/write/DorisDataWriter.scala | 4 +- 4 files changed, 151 insertions(+), 79 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 169f5a8..eec3f73 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -17,12 +17,6 @@ package org.apache.doris.spark.client.write; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.json.JsonMapper; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import org.apache.doris.spark.client.DorisBackendHttpClient; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.Backend; @@ -36,6 +30,10 @@ import org.apache.doris.spark.util.EscapeHandler; import org.apache.doris.spark.util.HttpUtils; import org.apache.doris.spark.util.URLs; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; @@ -51,6 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; @@ -66,56 +66,39 @@ import java.util.stream.Collectors; public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> implements DorisCommitter { - protected final Logger logger = LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", "")); - - protected static final JsonMapper MAPPER = JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).build(); - + protected static final JsonMapper MAPPER = + JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).build(); private static final String PARTIAL_COLUMNS = "partial_columns"; private static final String GROUP_COMMIT = "group_commit"; - private static final Set<String> VALID_GROUP_MODE = new HashSet<>(Arrays.asList("sync_mode", "async_mode", "off_mode")); - + private static final Set<String> VALID_GROUP_MODE = + new HashSet<>(Arrays.asList("sync_mode", "async_mode", "off_mode")); + private static final int arrowBufferSize = 1000; + protected final Logger logger = LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", "")); protected final DorisConfig config; - private final DorisFrontendClient frontend; private final DorisBackendHttpClient backendHttpClient; - private final String database; private final String table; - private final boolean autoRedirect; - private final boolean isHttpsEnabled; - private final boolean isTwoPhaseCommitEnabled; - private final Map<String, String> properties; - private final DataFormat format; - + private final boolean isGzipCompressionEnabled; + private final boolean isPassThrough; + private final List<R> recordBuffer = new LinkedList<>(); + private final int pipeSize; protected String columnSeparator; - private byte[] lineDelimiter; - - private final boolean isGzipCompressionEnabled; - private String groupCommit; - - private final boolean isPassThrough; - private PipedOutputStream output; - private boolean createNewBatch = true; - private boolean isFirstRecordOfBatch = true; - - private final List<R> recordBuffer = new LinkedList<>(); - - private static final int arrowBufferSize = 1000; - private transient ExecutorService executor; - private Future<CloseableHttpResponse> requestFuture = null; + private Future<StreamLoadResponse> requestFuture = null; private volatile String currentLabel; + private Exception unexpectedException = null; public AbstractStreamLoadProcessor(DorisConfig config) throws Exception { super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)); @@ -132,22 +115,31 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl // init stream load props this.isTwoPhaseCommitEnabled = config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC); this.format = DataFormat.valueOf(properties.getOrDefault("format", "csv").toUpperCase()); - this.isGzipCompressionEnabled = properties.containsKey("compress_type") && "gzip".equals(properties.get("compress_type")); + this.isGzipCompressionEnabled = + properties.containsKey("compress_type") && "gzip".equals(properties.get("compress_type")); if (properties.containsKey(GROUP_COMMIT)) { String message = ""; - if (isTwoPhaseCommitEnabled) message = "group commit does not support two-phase commit"; - if (properties.containsKey(PARTIAL_COLUMNS) && "true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) + if (isTwoPhaseCommitEnabled) { + message = "group commit does not support two-phase commit"; + } + if (properties.containsKey(PARTIAL_COLUMNS) && "true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) { message = "group commit does not support partial column updates"; - if (!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) + } + if (!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) { message = "Unsupported group commit mode: " + properties.get(GROUP_COMMIT); - if (!message.isEmpty()) throw new IllegalArgumentException(message); + } + if (!message.isEmpty()) { + throw new IllegalArgumentException(message); + } groupCommit = properties.get(GROUP_COMMIT).toLowerCase(); } this.isPassThrough = config.getValue(DorisOptions.DORIS_SINK_STREAMING_PASSTHROUGH); + this.pipeSize = config.getValue(DorisOptions.DORIS_SINK_NET_BUFFER_SIZE); } public void load(R row) throws Exception { if (createNewBatch) { + createNewBatch = false; if (autoRedirect) { requestFuture = frontend.requestFrontends((frontEnd, httpClient) -> buildReqAndExec(frontEnd.getHost(), frontEnd.getHttpPort(), httpClient)); @@ -155,14 +147,13 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl requestFuture = backendHttpClient.executeReq((backend, httpClient) -> buildReqAndExec(backend.getHost(), backend.getHttpPort(), httpClient)); } - createNewBatch = false; } if (isFirstRecordOfBatch) { isFirstRecordOfBatch = false; - } else if (lineDelimiter != null){ - output.write(lineDelimiter); + } else if (lineDelimiter != null) { + writeTo(lineDelimiter); } - output.write(toFormat(row, format)); + writeTo(toFormat(row, format)); currentBatchCount++; } @@ -175,23 +166,24 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) { List<R> rs = new LinkedList<>(recordBuffer); recordBuffer.clear(); - output.write(toArrowFormat(rs)); + writeTo(toArrowFormat(rs)); } output.close(); logger.info("stream load stopped with {}", currentLabel != null ? currentLabel : "group commit"); - CloseableHttpResponse res = requestFuture.get(); - if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new StreamLoadException("stream load execute failed, status: " + res.getStatusLine().getStatusCode() - + ", msg: " + res.getStatusLine().getReasonPhrase()); - } - String resEntity = EntityUtils.toString(new BufferedHttpEntity(res.getEntity())); - logger.info("stream load response: {}", resEntity); - StreamLoadResponse response = MAPPER.readValue(resEntity, StreamLoadResponse.class); - if (response != null && response.isSuccess()) { - return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; - } else { - throw new StreamLoadException("stream load execute failed, response: " + resEntity); + + StreamLoadResponse response; + try { + response = requestFuture.get(); + if (response == null) { + throw new StreamLoadException("response is null"); + } + } catch (Exception e) { + if (unexpectedException != null) { + throw unexpectedException; + } + throw new StreamLoadException("stream load stop failed", e); } + return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; } return null; } @@ -211,8 +203,9 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl .getReasonPhrase()); } else { String resEntity = EntityUtils.toString(new BufferedHttpEntity(response.getEntity())); - if(!checkTransResponse(resEntity)) { - throw new RuntimeException("commit transaction failed, transaction: " + msg + ", resp: " + resEntity); + if (!checkTransResponse(resEntity)) { + throw new RuntimeException( + "commit transaction failed, transaction: " + msg + ", resp: " + resEntity); } else { this.logger.info("commit: {} response: {}", msg, resEntity); } @@ -256,8 +249,9 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl .getReasonPhrase()); } else { String resEntity = EntityUtils.toString(new BufferedHttpEntity(response.getEntity())); - if(!checkTransResponse(resEntity)) { - throw new RuntimeException("abort transaction failed, transaction: " + msg + ", resp: " + resEntity); + if (!checkTransResponse(resEntity)) { + throw new RuntimeException( + "abort transaction failed, transaction: " + msg + ", resp: " + resEntity); } else { this.logger.info("abort: {} response: {}", msg, resEntity); } @@ -271,11 +265,11 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl ObjectMapper objectMapper = new ObjectMapper(); try { String status = objectMapper.readTree(resEntity).get("status").asText(); - if ("Success".equalsIgnoreCase(status)) { - return true; - } + if ("Success".equalsIgnoreCase(status)) { + return true; + } } catch (JsonProcessingException e) { - logger.warn("invalid json response: " + resEntity, e); + logger.warn("invalid json response: " + resEntity, e); } return false; } @@ -341,7 +335,9 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl if (config.contains(DorisOptions.DORIS_MAX_FILTER_RATIO)) { httpPut.setHeader("max_filter_ratio", config.getValue(DorisOptions.DORIS_MAX_FILTER_RATIO)); } - if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit", "true"); + if (isTwoPhaseCommitEnabled) { + httpPut.setHeader("two_phase_commit", "true"); + } switch (format) { case CSV: @@ -352,7 +348,7 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl break; case JSON: lineDelimiter = properties.getOrDefault("line_delimiter", "\n").getBytes( - StandardCharsets.UTF_8); + StandardCharsets.UTF_8); properties.put("read_json_by_line", "true"); break; case ARROW: @@ -384,14 +380,14 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl protected abstract String generateStreamLoadLabel() throws OptionRequiredException; - private Future<CloseableHttpResponse> buildReqAndExec(String host, Integer port, CloseableHttpClient client) { + private Future<StreamLoadResponse> buildReqAndExec(String host, Integer port, CloseableHttpClient client) { HttpPut httpPut = new HttpPut(URLs.streamLoad(host, port, database, table, isHttpsEnabled)); try { handleStreamLoadProperties(httpPut); } catch (OptionRequiredException e) { throw new RuntimeException("stream load handle properties failed", e); } - PipedInputStream pipedInputStream = new PipedInputStream(4096); + PipedInputStream pipedInputStream = new PipedInputStream(pipeSize); try { output = new PipedOutputStream(pipedInputStream); } catch (IOException e) { @@ -402,15 +398,44 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl entity = new GzipCompressingEntity(entity); } httpPut.setEntity(entity); - - logger.info("table {}.{} stream load started for {} on host {}:{}", database, table, currentLabel != null ? currentLabel : "group commit", host, port); - return getExecutors().submit(() -> client.execute(httpPut)); + Thread currentThread = Thread.currentThread(); + + logger.info("table {}.{} stream load started for {} on host {}:{}", database, table, + currentLabel != null ? currentLabel : "group commit", host, port); + return getExecutors().submit(() -> { + StreamLoadResponse streamLoadResponse = null; + try (CloseableHttpResponse response = client.execute(httpPut)) { + // stream load http request finished unexpectedly + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new StreamLoadException( + "stream load failed, status: " + response.getStatusLine().getStatusCode() + + ", reason: " + response.getStatusLine().getReasonPhrase()); + } + String entityStr = EntityUtils.toString(response.getEntity()); + streamLoadResponse = MAPPER.readValue(entityStr, StreamLoadResponse.class); + logger.info("stream load response: " + entityStr); + if (streamLoadResponse == null) { + throw new StreamLoadException("stream load failed, response is null, response: " + entityStr); + } else if (!streamLoadResponse.isSuccess()) { + throw new StreamLoadException( + "stream load failed, txnId: " + streamLoadResponse.getTxnId() + + ", status: " + streamLoadResponse.getStatus() + + ", msg: " + streamLoadResponse.getMessage()); + } + } catch (Exception e) { + logger.error("stream load exception", e); + unexpectedException = e; + currentThread.interrupt(); + } + return streamLoadResponse; + }); } @Override public void close() throws IOException { createNewBatch = true; isFirstRecordOfBatch = true; + unexpectedException = null; frontend.close(); if (backendHttpClient != null) { backendHttpClient.close(); @@ -447,4 +472,16 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl return executor; } + private void writeTo(byte[] bytes) throws Exception { + try { + output.write(bytes); + } catch (Exception e) { + if (unexpectedException != null) { + throw unexpectedException; + } else { + throw e; + } + } + } + } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java index a8d29ae..01dad06 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java @@ -65,7 +65,7 @@ public class DorisOptions { public static final ConfigOption<String> DORIS_WRITE_FIELDS = ConfigOptions.name("doris.write.fields").stringType().withoutDefaultValue().withDescription(""); - public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription(""); + public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(500000).withDescription(""); public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES = ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription(""); public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS = ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The interval at which the Spark connector tries to load the batch of data again after load fails."); @@ -130,5 +130,7 @@ public class DorisOptions { public static final ConfigOption<Boolean> DORIS_READ_BITMAP_TO_BASE64 = ConfigOptions.name("doris.read.bitmap-to-base64").booleanType().defaultValue(false).withDescription(""); + public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 1024).withDescription(""); + } \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala index dd73f53..9c38eed 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala @@ -17,18 +17,21 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} import org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder, getDorisQueryConnection} +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} import org.apache.doris.spark.rest.models.DataModel +import org.apache.spark.SparkException import org.apache.spark.sql.SparkSession -import org.junit.{Before, Test} +import org.hamcrest.{CoreMatchers, Description, Matcher} +import org.junit.rules.ExpectedException +import org.junit.{Before, Rule, Test} import org.slf4j.LoggerFactory import java.util import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} -import scala.util.control.Breaks._ import scala.collection.JavaConverters._ +import scala.util.control.Breaks._ /** * Test DorisWriter failover. @@ -41,6 +44,12 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry" val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail" val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail" + val TABLE_WRITE_TBL_FAIL_BEFORE_STOP = "tbl_write_tbl_fail_before_stop" + + val _thrown: ExpectedException = ExpectedException.none + + @Rule + def thrown: ExpectedException = _thrown @Before def setUp(): Unit = { @@ -223,4 +232,30 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { LOG.info("Checking DorisWriterFailoverITCase result. testName={}, actual={}, expected={}", testName, actual, expected) assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) } + + @Test + def testForWriteExceptionBeforeStop(): Unit = { + initializeTable(TABLE_WRITE_TBL_FAIL_BEFORE_STOP, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[1]").getOrCreate() + try { + val df = session.createDataFrame(Seq( + ("doris", "cn"), + ("spark", "us"), + ("catalog", "uk") + )).toDF("name", "address") + thrown.expect(classOf[SparkException]) + df.write.format("doris") + .option("table.identifier", DATABASE + "." + TABLE_WRITE_TBL_FAIL_BEFORE_STOP) + .option("fenodes", getFenodes) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("doris.sink.properties.partial_columns", "true") + .option("doris.sink.net.buffer.size", "1") + .mode("append") + .save() + } finally { + session.stop() + } + } + } diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala index b6edf3c..bc92a93 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala @@ -28,12 +28,10 @@ import org.apache.spark.sql.types.StructType import java.time.Duration import java.util.concurrent.locks.LockSupport import scala.collection.mutable -import scala.util.{Failure, Random, Success} +import scala.util.{Failure, Success} class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with Logging { - private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE) - private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) = config.getValue(DorisOptions.LOAD_MODE) match { case "stream_load" => (new StreamLoadProcessor(config, schema), new StreamLoadProcessor(config, schema)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org