gnehil commented on code in PR #285:
URL: 
https://github.com/apache/doris-spark-connector/pull/285#discussion_r2014193959


##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -143,14 +142,26 @@ public AbstractStreamLoadProcessor(DorisConfig config) 
throws Exception {
     public void load(R row) throws Exception {
         if (createNewBatch) {
             if (autoRedirect) {
-                requestFuture = frontend.requestFrontends((frontEnd, 
httpClient) ->
-                        buildReqAndExec(frontEnd.getHost(), 
frontEnd.getHttpPort(), httpClient));
+                requestFuture = frontend.requestFrontends((frontEnd, 
httpClient) -> {
+                    if (isTwoPhaseCommitEnabled && frontEnd.getHttpPort() <= 
0) {
+                        throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_FENODES.getName()
+                                + "] is not in correct format when ["
+                                + DorisOptions.DORIS_SINK_ENABLE_2PC.getName() 
+ " = true"
+                                + "], for example: host:port[,host2:port]");
+                    }
+                    return buildReqAndExec(frontEnd.getHost(), 
frontEnd.getHttpPort(), httpClient);
+                });
             } else {
                 requestFuture = backendHttpClient.executeReq((backend, 
httpClient) ->
                         buildReqAndExec(backend.getHost(), 
backend.getHttpPort(), httpClient));
             }
             createNewBatch = false;
         }
+        if (isFirstRecordOfBatch) {
+            isFirstRecordOfBatch = false;
+        } else {
+            output.write(lineDelimiter);

Review Comment:
   Only CSV and JSON formats require line delimiter, writing line delimiter in 
ARROW format will cause parsing exceptions.



##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -183,66 +195,94 @@ public String stop() throws Exception {
         return null;
     }
 
+    private void execCommitReq(String host, int port, String msg, 
CloseableHttpClient httpClient) {
+        HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, 
isHttpsEnabled));
+        try {
+            handleCommitHeaders(httpPut, msg);
+        } catch (OptionRequiredException e) {
+            throw new RuntimeException("stream load handle commit props 
failed", e);
+        }
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPut);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new RuntimeException("commit transaction failed, 
transaction: " + msg + ", status: "
+                        + response.getStatusLine().getStatusCode() + ", 
reason: " + response.getStatusLine()
+                        .getReasonPhrase());
+            } else {
+                String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
+                this.logger.info("commit: {} response: {}", msg, resEntity);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
+        }
+    }
+
     @Override
     public void commit(String msg) throws Exception {
         if (isTwoPhaseCommitEnabled) {
             logger.info("begin to commit transaction {}", msg);
-            frontend.requestFrontends((frontEnd, httpClient) -> {
-                HttpPut httpPut = new 
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), 
database, isHttpsEnabled));
-                try {
-                    handleCommitHeaders(httpPut, msg);
-                } catch (OptionRequiredException e) {
-                    throw new RuntimeException("stream load handle commit 
props failed", e);
-                }
-                try {
-                    CloseableHttpResponse response = 
httpClient.execute(httpPut);
-                    if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                        throw new RuntimeException("commit transaction failed, 
transaction: " + msg
-                                + ", status: " + 
response.getStatusLine().getStatusCode()
-                                + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
-                }
-                return null;
-            });
+            if (autoRedirect) {
+                frontend.requestFrontends((frontEnd, httpClient) -> {
+                    execCommitReq(frontEnd.getHost(), frontEnd.getHttpPort(), 
msg, httpClient);
+                    return null;
+                });
+            } else {
+                backendHttpClient.executeReq((backend, httpClient) -> {
+                    execCommitReq(backend.getHost(), backend.getHttpPort(), 
msg, httpClient);
+                    return null;
+                });
+            }
             logger.info("success to commit transaction {}", msg);
         }
     }
 
+    private void execAbortReq(String host, int port, String msg, 
CloseableHttpClient httpClient) {
+        HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, 
isHttpsEnabled));
+        try {
+            handleAbortHeaders(httpPut, msg);
+        } catch (OptionRequiredException e) {
+            throw new RuntimeException("stream load handle abort props 
failed", e);
+        }
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPut);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new RuntimeException("abort transaction failed, 
transaction: " + msg + ", status: "
+                        + response.getStatusLine().getStatusCode() + ", 
reason: " + response.getStatusLine()
+                        .getReasonPhrase());
+            } else {
+                String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
+                this.logger.info("abort: {} response: {}", msg, resEntity);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("abort transaction failed, transaction: 
" + msg, e);
+        }
+    }
+
     @Override
     public void abort(String msg) throws Exception {
         if (isTwoPhaseCommitEnabled) {
             logger.info("begin to abort transaction {}", msg);
-            frontend.requestFrontends((frontEnd, httpClient) -> {
-                HttpPut httpPut = new 
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), 
database, isHttpsEnabled));
-                try {
-                    handleAbortHeaders(httpPut, msg);
-                } catch (OptionRequiredException e) {
-                    throw new RuntimeException("stream load handle abort props 
failed", e);
-                }
-                try {
-                    CloseableHttpResponse response = 
httpClient.execute(httpPut);
-                    if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                        throw new RuntimeException("abort transaction failed, 
transaction: " + msg
-                                + ", status: " + 
response.getStatusLine().getStatusCode()
-                                + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException("abort transaction failed, 
transaction: " + msg, e);
-                }
-                return null; // Returning null as the callback does not return 
anything
-            });
+            if (autoRedirect) {

Review Comment:
   Same as commit part.



##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -183,66 +195,94 @@ public String stop() throws Exception {
         return null;
     }
 
+    private void execCommitReq(String host, int port, String msg, 
CloseableHttpClient httpClient) {
+        HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, 
isHttpsEnabled));
+        try {
+            handleCommitHeaders(httpPut, msg);
+        } catch (OptionRequiredException e) {
+            throw new RuntimeException("stream load handle commit props 
failed", e);
+        }
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPut);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new RuntimeException("commit transaction failed, 
transaction: " + msg + ", status: "
+                        + response.getStatusLine().getStatusCode() + ", 
reason: " + response.getStatusLine()
+                        .getReasonPhrase());
+            } else {
+                String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
+                this.logger.info("commit: {} response: {}", msg, resEntity);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
+        }
+    }
+
     @Override
     public void commit(String msg) throws Exception {
         if (isTwoPhaseCommitEnabled) {
             logger.info("begin to commit transaction {}", msg);
-            frontend.requestFrontends((frontEnd, httpClient) -> {
-                HttpPut httpPut = new 
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), 
database, isHttpsEnabled));
-                try {
-                    handleCommitHeaders(httpPut, msg);
-                } catch (OptionRequiredException e) {
-                    throw new RuntimeException("stream load handle commit 
props failed", e);
-                }
-                try {
-                    CloseableHttpResponse response = 
httpClient.execute(httpPut);
-                    if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                        throw new RuntimeException("commit transaction failed, 
transaction: " + msg
-                                + ", status: " + 
response.getStatusLine().getStatusCode()
-                                + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
-                }
-                return null;
-            });
+            if (autoRedirect) {

Review Comment:
   It is unnecessary to start the commit through BE, and the transaction will 
be processed through FE in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to