This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fddf0a  throw exception when commit transaction failed (#206)
7fddf0a is described below

commit 7fddf0aea9ef70caa42e8b4b993395a6383616f5
Author: zhaorongsheng <zhaorongsh...@users.noreply.github.com>
AuthorDate: Wed Jun 5 15:44:17 2024 +0800

    throw exception when commit transaction failed (#206)
---
 .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java    | 7 ++++---
 .../src/main/scala/org/apache/doris/spark/load/StreamLoader.scala | 8 +++++---
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6b1708d..7f97516 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -328,10 +328,11 @@ public class DorisStreamLoad implements Serializable {
                 String loadResult = EntityUtils.toString(response.getEntity());
                 Map<String, String> res = MAPPER.readValue(loadResult, new 
TypeReference<HashMap<String, String>>() {
                 });
-                if (res.get("status").equals("Fail") && 
!ResponseUtil.isCommitted(res.get("msg"))) {
-                    throw new StreamLoadException("Commit failed " + 
loadResult);
+                if (res.get("status").equals("Success") || 
ResponseUtil.isCommitted(res.get("msg"))) {
+                    LOG.info("commit transaction {} succeed, load result: 
{}.", txnId, loadResult);
                 } else {
-                    LOG.info("load result {}", loadResult);
+                    LOG.error("commit transaction {} failed. load result: {}", 
txnId, loadResult);
+                    throw new StreamLoadException("Commit failed " + 
loadResult);
                 }
             }
 
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 5986c08..9481b6f 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -151,10 +151,12 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
       if (response.getEntity != null) {
         val loadResult = EntityUtils.toString(response.getEntity)
         val res = MAPPER.readValue(loadResult, new 
TypeReference[util.HashMap[String, String]]() {})
-        if (res.get("status") == "Fail" && 
!ResponseUtil.isCommitted(res.get("msg"))) throw new 
StreamLoadException("Commit failed " + loadResult)
-        else LOG.info("load result {}", loadResult)
+        if (res.get("status") == "Success" || 
ResponseUtil.isCommitted(res.get("msg"))) LOG.info("commit transaction {} 
succeed, load result: {}.", msg.value, loadResult)
+        else {
+          LOG.error("commit transaction {} failed. load result: {}", 
msg.value, loadResult)
+          throw new StreamLoadException("Commit failed " + loadResult)
+        }
       }
-
     } match {
       case Success(_) => client.close()
       case Failure(e) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to