This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new c66b512 [fix](connector) fix npe when empty partition commit txn (#286) c66b512 is described below commit c66b5122089073d19e4fcd4c14851ef9398bf7ab Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Mar 18 17:34:27 2025 +0800 [fix](connector) fix npe when empty partition commit txn (#286) --- .../client/write/AbstractStreamLoadProcessor.java | 43 ++++++++++++---------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 9222d81..9bc3037 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -155,27 +155,30 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, @Override public String stop() throws Exception { - // arrow format need to send all buffer data before stop - if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) { - List<R> rs = new LinkedList<>(recordBuffer); - recordBuffer.clear(); - output.write(toArrowFormat(rs)); - } - output.close(); - CloseableHttpResponse res = requestFuture.get(); - if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new StreamLoadException("stream load execute failed, status: " + res.getStatusLine().getStatusCode() - + ", msg: " + res.getStatusLine().getReasonPhrase()); - } - String resEntity = EntityUtils.toString(new BufferedHttpEntity(res.getEntity())); - logger.info("stream load response: {}", resEntity); - StreamLoadResponse response = MAPPER.readValue(resEntity, StreamLoadResponse.class); - if (response != null && response.isSuccess()) { - createNewBatch = true; - return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; - } else { - throw new StreamLoadException("stream load execute failed, response: " + resEntity); + if (requestFuture != null) { + // arrow format need to send all buffer data before stop + if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) { + List<R> rs = new LinkedList<>(recordBuffer); + recordBuffer.clear(); + output.write(toArrowFormat(rs)); + } + output.close(); + CloseableHttpResponse res = requestFuture.get(); + if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new StreamLoadException("stream load execute failed, status: " + res.getStatusLine().getStatusCode() + + ", msg: " + res.getStatusLine().getReasonPhrase()); + } + String resEntity = EntityUtils.toString(new BufferedHttpEntity(res.getEntity())); + logger.info("stream load response: {}", resEntity); + StreamLoadResponse response = MAPPER.readValue(resEntity, StreamLoadResponse.class); + if (response != null && response.isSuccess()) { + createNewBatch = true; + return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; + } else { + throw new StreamLoadException("stream load execute failed, response: " + resEntity); + } } + return null; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org