This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new d933102 spark-doris-connector supports stream load parameter (#52) d933102 is described below commit d9331029425ba6038845c407617e217bad708275 Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Mon Oct 31 09:49:21 2022 +0800 spark-doris-connector supports stream load parameter (#52) spark-doris-connector supports stream load parameter --- .../org/apache/doris/spark/DorisStreamLoad.java | 32 ++++++++++++++++++++++ .../doris/spark/cfg/ConfigurationOptions.java | 2 ++ .../scala/org/apache/doris/spark/sql/Utils.scala | 8 +++++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 68e513c..f375c76 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Base64; import java.util.HashMap; import java.util.Calendar; +import java.util.Properties; /** @@ -69,6 +70,7 @@ public class DorisStreamLoad implements Serializable{ private String columns; private String[] dfColumns; private String maxFilterRatio; + private Map<String,String> streamLoadProp; public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { this.hostPort = hostPort; @@ -93,6 +95,9 @@ public class DorisStreamLoad implements Serializable{ this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); + this.streamLoadProp=getStreamLoadProp(settings); + + } @@ -112,6 +117,7 @@ public class DorisStreamLoad implements Serializable{ this.dfColumns = dfColumns; this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); + this.streamLoadProp=getStreamLoadProp(settings); } public String getLoadUrlStr() { @@ -147,6 +153,20 @@ public class DorisStreamLoad implements Serializable{ conn.setDoOutput(true); conn.setDoInput(true); + if(streamLoadProp != null ){ + streamLoadProp.forEach((k,v) -> { + if(streamLoadProp.containsKey("format")){ + return; + } + if(streamLoadProp.containsKey("strip_outer_array")) { + return; + } + if(streamLoadProp.containsKey("read_json_by_line")){ + return; + } + conn.addRequestProperty(k, v); + }); + } conn.addRequestProperty("format", "json"); conn.addRequestProperty("strip_outer_array", "true"); return conn; @@ -272,4 +292,16 @@ public class DorisStreamLoad implements Serializable{ } } } + + public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){ + Map<String,String> streamLoadPropMap = new HashMap<>(); + Properties properties = sparkSettings.asProperties(); + for (String key : properties.stringPropertyNames()) { + if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){ + String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length()); + streamLoadPropMap.put(subKey,properties.getProperty(key)); + } + } + return streamLoadPropMap; + } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index e3c55d6..8cc4477 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -73,4 +73,6 @@ public interface ConfigurationOptions { int SINK_MAX_RETRIES_DEFAULT = 1; String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio"; + + String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties."; } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 6b66646..b0e2e15 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -103,7 +103,13 @@ private[sql] object Utils { */ def params(parameters: Map[String, String], logger: Logger) = { // '.' seems to be problematic when specifying the options - val dottedParams = parameters.map { case (k, v) => (k.replace('_', '.'), v)} + val dottedParams = parameters.map { case (k, v) => + if (k.startsWith("sink.properties.")){ + (k,v) + }else { + (k.replace('_', '.'), v) + } + } val preferredTableIdentifier = dottedParams.get(ConfigurationOptions.DORIS_TABLE_IDENTIFIER) .orElse(dottedParams.get(ConfigurationOptions.TABLE_IDENTIFIER)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org