This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new addfff7 support use char like \x01 in flink-doris-sink column & line delimiter (#6937) addfff7 is described below commit addfff74c46f052e1201e968614d159f0548bb30 Author: wunan1210 <wunan1...@gmail.com> AuthorDate: Fri Oct 29 13:56:52 2021 +0800 support use char like \x01 in flink-doris-sink column & line delimiter (#6937) * support use char like \x01 in flink-doris-sink column & line delimiter * extend imports * add docs --- docs/en/extending-doris/flink-doris-connector.md | 2 +- .../zh-CN/extending-doris/flink-doris-connector.md | 2 +- .../doris/flink/rest/models/RespContent.java | 4 ++ .../flink/table/DorisDynamicOutputFormat.java | 44 +++++++++++++++++++--- .../apache/doris/flink/table/DorisStreamLoad.java | 3 +- 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md index c42d237..961da90 100644 --- a/docs/en/extending-doris/flink-doris-connector.md +++ b/docs/en/extending-doris/flink-doris-connector.md @@ -257,7 +257,7 @@ outputFormat.close(); | sink.batch.size | 100 | Maximum number of lines in a single write BE | | sink.max-retries | 1 | Number of retries after writing BE failed | | sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. | -| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','.<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'| +| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','. Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'| ## Doris & Flink Column Type Mapping diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md index 9ea1eba..4a0a8cb 100644 --- a/docs/zh-CN/extending-doris/flink-doris-connector.md +++ b/docs/zh-CN/extending-doris/flink-doris-connector.md @@ -260,7 +260,7 @@ outputFormat.close(); | sink.batch.size | 100 | 单次写BE的最大行数 | | sink.max-retries | 1 | 写BE失败之后的重试次数 | | sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。| -| sink.properties.* | -- | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' | +| sink.properties.* | -- | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。如果需要特殊字符作为分隔符, 可以加上参数'sink.properties.escape_delimiters' = 'true', '\\x01'会被转换为二进制的0x01<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' | diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java index b86b3dd..07a356c 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java @@ -93,4 +93,8 @@ public class RespContent { } } + + public String getErrorURL() { + return ErrorURL; + } } diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 0fd154a..f4f49bd 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -38,11 +38,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -62,9 +65,11 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private static final String FORMAT_KEY = "format"; private static final String FORMAT_JSON_VALUE = "json"; private static final String NULL_VALUE = "\\N"; + private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters"; + private static final String ESCAPE_DELIMITERS_DEFAULT = "false"; - private final String fieldDelimiter; - private final String lineDelimiter; + private String fieldDelimiter; + private String lineDelimiter; private final String[] fieldNames; private final boolean jsonFormat; private DorisOptions options; @@ -88,10 +93,26 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; - this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, - FIELD_DELIMITER_DEFAULT); - this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, - LINE_DELIMITER_DEFAULT); + + Properties streamLoadProp=executionOptions.getStreamLoadProp(); + + boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT)); + if (ifEscape) { + this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY, + FIELD_DELIMITER_DEFAULT)); + this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, + LINE_DELIMITER_DEFAULT)); + + if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) { + streamLoadProp.remove(ESCAPE_DELIMITERS_KEY); + } + } else { + this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY, + FIELD_DELIMITER_DEFAULT); + this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, + LINE_DELIMITER_DEFAULT); + } + this.fieldNames = fieldNames; this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; @@ -100,6 +121,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } + private String escapeString( String s) { + Pattern p = Pattern.compile("\\\\x(\\d{2})"); + Matcher m = p.matcher(s); + + StringBuffer buf = new StringBuffer(); + while (m.find()) { + m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1)))); + } + m.appendTail(buf); + return buf.toString(); + } @Override public void configure(Configuration configuration) { diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index c37e640..b897ff2 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -94,7 +94,8 @@ public class DorisStreamLoad implements Serializable { try { RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - throw new StreamLoadException("stream load error: " + respContent.getMessage()); + String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL()); + throw new StreamLoadException(errMsg); } } catch (IOException e) { throw new StreamLoadException(e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org