This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 97d2222 [improve]Return specific error information after optimizing the exception (#64) 97d2222 is described below commit 97d2222153f57db4b5616b36c18a7711e5ebcbe4 Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Sat Jan 28 10:17:34 2023 +0800 [improve]Return specific error information after optimizing the exception (#64) * Return specific error information after optimizing the exception --- .../org/apache/doris/spark/DorisStreamLoad.java | 49 +++++++++++----------- .../doris/spark/sql/TestSparkConnector.scala | 4 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 25ed7b1..351ef23 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit; /** * DorisStreamLoad **/ -public class DorisStreamLoad implements Serializable{ +public class DorisStreamLoad implements Serializable { public static final String FIELD_DELIMITER = "\t"; public static final String LINE_DELIMITER = "\n"; public static final String NULL_VALUE = "\\N"; @@ -68,7 +68,7 @@ public class DorisStreamLoad implements Serializable{ private String columns; private String[] dfColumns; private String maxFilterRatio; - private Map<String,String> streamLoadProp; + private Map<String, String> streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private LoadingCache<String, List<BackendV2.BackendRowV2>> cache; @@ -91,7 +91,7 @@ public class DorisStreamLoad implements Serializable{ this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); - this.streamLoadProp=getStreamLoadProp(settings); + this.streamLoadProp = getStreamLoadProp(settings); cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); @@ -110,7 +110,7 @@ public class DorisStreamLoad implements Serializable{ this.dfColumns = dfColumns; this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); - this.streamLoadProp=getStreamLoadProp(settings); + this.streamLoadProp = getStreamLoadProp(settings); cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); @@ -142,15 +142,15 @@ public class DorisStreamLoad implements Serializable{ conn.setDoOutput(true); conn.setDoInput(true); - if(streamLoadProp != null ){ - streamLoadProp.forEach((k,v) -> { - if(streamLoadProp.containsKey("format")){ + if (streamLoadProp != null) { + streamLoadProp.forEach((k, v) -> { + if (streamLoadProp.containsKey("format")) { return; } - if(streamLoadProp.containsKey("strip_outer_array")) { + if (streamLoadProp.containsKey("strip_outer_array")) { return; } - if(streamLoadProp.containsKey("read_json_by_line")){ + if (streamLoadProp.containsKey("read_json_by_line")) { return; } conn.addRequestProperty(k, v); @@ -171,6 +171,7 @@ public class DorisStreamLoad implements Serializable{ this.respMsg = respMsg; this.respContent = respContent; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -199,10 +200,10 @@ public class DorisStreamLoad implements Serializable{ public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException { - List<Map<Object,Object>> dataList = new ArrayList<>(); + List<Map<Object, Object>> dataList = new ArrayList<>(); try { for (List<Object> row : rows) { - Map<Object,Object> dataMap = new HashMap<>(); + Map<Object, Object> dataMap = new HashMap<>(); if (dfColumns.length == row.size()) { for (int i = 0; i < dfColumns.length; i++) { dataMap.put(dfColumns[i], row.get(i)); @@ -222,18 +223,18 @@ public class DorisStreamLoad implements Serializable{ public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); - if(loadResponse.status != 200){ - LOG.info("Streamload Response HTTP Status Error:{}",loadResponse); + if (loadResponse.status != 200) { + LOG.info("Streamload Response HTTP Status Error:{}", loadResponse); throw new StreamLoadException("stream load error: " + loadResponse.respContent); - }else{ - LOG.info("Streamload Response:{}",loadResponse); + } else { ObjectMapper obj = new ObjectMapper(); try { RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class); - if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){ - LOG.info("Streamload Response RES STATUS Error:{}", loadResponse); - throw new StreamLoadException("stream load error: " + respContent.getMessage()); + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + LOG.error("Streamload Response RES STATUS Error:{}", loadResponse); + throw new StreamLoadException("stream load error: " + loadResponse); } + LOG.info("Streamload Response:{}", loadResponse); } catch (IOException e) { throw new StreamLoadException(e); } @@ -277,7 +278,7 @@ public class DorisStreamLoad implements Serializable{ } catch (Exception e) { e.printStackTrace(); - String err = "http request exception,load url : "+loadUrlStr+",failed to execute spark streamload with label: " + label; + String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark streamload with label: " + label; LOG.warn(err, e); return new LoadResponse(status, e.getMessage(), err); } finally { @@ -290,13 +291,13 @@ public class DorisStreamLoad implements Serializable{ } } - public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){ - Map<String,String> streamLoadPropMap = new HashMap<>(); + public Map<String, String> getStreamLoadProp(SparkSettings sparkSettings) { + Map<String, String> streamLoadPropMap = new HashMap<>(); Properties properties = sparkSettings.asProperties(); for (String key : properties.stringPropertyNames()) { - if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){ + if (key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)) { String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length()); - streamLoadPropMap.put(subKey,properties.getProperty(key)); + streamLoadPropMap.put(subKey, properties.getProperty(key)); } } return streamLoadPropMap; @@ -310,7 +311,7 @@ public class DorisStreamLoad implements Serializable{ BackendV2.BackendRowV2 backend = backends.get(0); return backend.getIp() + ":" + backend.getHttpPort(); } catch (ExecutionException e) { - throw new RuntimeException("get backends info fail",e); + throw new RuntimeException("get backends info fail", e); } } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala index 09faf39..54771df 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -58,7 +58,7 @@ class TestSparkConnector { ("zhangsan", "m"), ("lisi", "f"), ("wangwu", "m") - )) + )).toDF("name","gender") df.write .format("doris") .option("doris.fenodes", dorisFeNodes) @@ -66,7 +66,7 @@ class TestSparkConnector { .option("user", dorisUser) .option("password", dorisPwd) //specify your field - .option("doris.write.field", "name,gender") + .option("doris.write.fields", "name,gender") .option("sink.batch.size",2) .option("sink.max-retries",2) .save() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org