This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7fddf0a throw exception when commit transaction failed (#206) 7fddf0a is described below commit 7fddf0aea9ef70caa42e8b4b993395a6383616f5 Author: zhaorongsheng <zhaorongsh...@users.noreply.github.com> AuthorDate: Wed Jun 5 15:44:17 2024 +0800 throw exception when commit transaction failed (#206) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 7 ++++--- .../src/main/scala/org/apache/doris/spark/load/StreamLoader.scala | 8 +++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 6b1708d..7f97516 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -328,10 +328,11 @@ public class DorisStreamLoad implements Serializable { String loadResult = EntityUtils.toString(response.getEntity()); Map<String, String> res = MAPPER.readValue(loadResult, new TypeReference<HashMap<String, String>>() { }); - if (res.get("status").equals("Fail") && !ResponseUtil.isCommitted(res.get("msg"))) { - throw new StreamLoadException("Commit failed " + loadResult); + if (res.get("status").equals("Success") || ResponseUtil.isCommitted(res.get("msg"))) { + LOG.info("commit transaction {} succeed, load result: {}.", txnId, loadResult); } else { - LOG.info("load result {}", loadResult); + LOG.error("commit transaction {} failed. load result: {}", txnId, loadResult); + throw new StreamLoadException("Commit failed " + loadResult); } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 5986c08..9481b6f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -151,10 +151,12 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (response.getEntity != null) { val loadResult = EntityUtils.toString(response.getEntity) val res = MAPPER.readValue(loadResult, new TypeReference[util.HashMap[String, String]]() {}) - if (res.get("status") == "Fail" && !ResponseUtil.isCommitted(res.get("msg"))) throw new StreamLoadException("Commit failed " + loadResult) - else LOG.info("load result {}", loadResult) + if (res.get("status") == "Success" || ResponseUtil.isCommitted(res.get("msg"))) LOG.info("commit transaction {} succeed, load result: {}.", msg.value, loadResult) + else { + LOG.error("commit transaction {} failed. load result: {}", msg.value, loadResult) + throw new StreamLoadException("Commit failed " + loadResult) + } } - } match { case Success(_) => client.close() case Failure(e) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org