This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c66b512  [fix](connector) fix npe when empty partition commit txn 
(#286)
c66b512 is described below

commit c66b5122089073d19e4fcd4c14851ef9398bf7ab
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Tue Mar 18 17:34:27 2025 +0800

    [fix](connector) fix npe when empty partition commit txn (#286)
---
 .../client/write/AbstractStreamLoadProcessor.java  | 43 ++++++++++++----------
 1 file changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 9222d81..9bc3037 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -155,27 +155,30 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
 
     @Override
     public String stop() throws Exception {
-        // arrow format need to send all buffer data before stop
-        if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
-            List<R> rs = new LinkedList<>(recordBuffer);
-            recordBuffer.clear();
-            output.write(toArrowFormat(rs));
-        }
-        output.close();
-        CloseableHttpResponse res = requestFuture.get();
-        if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-            throw new StreamLoadException("stream load execute failed, status: 
" + res.getStatusLine().getStatusCode()
-                    + ", msg: " + res.getStatusLine().getReasonPhrase());
-        }
-        String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(res.getEntity()));
-        logger.info("stream load response: {}", resEntity);
-        StreamLoadResponse response = MAPPER.readValue(resEntity, 
StreamLoadResponse.class);
-        if (response != null && response.isSuccess()) {
-            createNewBatch = true;
-            return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
-        } else {
-            throw new StreamLoadException("stream load execute failed, 
response: " + resEntity);
+        if (requestFuture != null) {
+            // arrow format need to send all buffer data before stop
+            if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
+                List<R> rs = new LinkedList<>(recordBuffer);
+                recordBuffer.clear();
+                output.write(toArrowFormat(rs));
+            }
+            output.close();
+            CloseableHttpResponse res = requestFuture.get();
+            if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new StreamLoadException("stream load execute failed, 
status: " + res.getStatusLine().getStatusCode()
+                        + ", msg: " + res.getStatusLine().getReasonPhrase());
+            }
+            String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(res.getEntity()));
+            logger.info("stream load response: {}", resEntity);
+            StreamLoadResponse response = MAPPER.readValue(resEntity, 
StreamLoadResponse.class);
+            if (response != null && response.isSuccess()) {
+                createNewBatch = true;
+                return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
+            } else {
+                throw new StreamLoadException("stream load execute failed, 
response: " + resEntity);
+            }
         }
+        return null;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to