This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 706310e fix hidden column does not take effect (#117) 706310e is described below commit 706310e73e455f08c9a65b5085e7492c19bd21c8 Author: wudi <676366...@qq.com> AuthorDate: Fri Mar 10 17:49:47 2023 +0800 fix hidden column does not take effect (#117) Co-authored-by: wudi <> --- .../org/apache/doris/flink/sink/DorisSink.java | 1 - .../org/apache/doris/flink/sink/EscapeHandler.java | 7 +- .../doris/flink/sink/writer/DorisStreamLoad.java | 3 +- .../doris/flink/sink/writer/RowDataSerializer.java | 3 +- .../doris/flink/DorisSinkExampleRowData.java | 101 +++++++++++++++++++++ 5 files changed, 108 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index efb530e..d64e488 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -128,7 +128,6 @@ public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterStat Preconditions.checkNotNull(dorisOptions); Preconditions.checkNotNull(dorisExecutionOptions); Preconditions.checkNotNull(serializer); - EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp()); if(dorisReadOptions == null) { dorisReadOptions = DorisReadOptions.builder().build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java index 1b9a70f..f83cc79 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java @@ -33,7 +33,7 @@ public class EscapeHandler { public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})"); - public String escapeString(String source) { + public static String escapeString(String source) { if (source.contains(ESCAPE_DELIMITERS_FLAGS)) { Matcher m = ESCAPE_PATTERN.matcher(source); StringBuffer buf = new StringBuffer(); @@ -46,7 +46,7 @@ public class EscapeHandler { return source; } - public void handle(Properties properties) { + public static void handle(Properties properties) { String fieldDelimiter = properties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT); if (fieldDelimiter.contains(ESCAPE_DELIMITERS_FLAGS)) { @@ -60,7 +60,6 @@ public class EscapeHandler { } public static void handleEscape(Properties properties) { - EscapeHandler handler = new EscapeHandler(); - handler.handle(properties); + handle(properties); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 80760a2..dc33fce 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -25,6 +25,7 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; +import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -105,7 +106,7 @@ public class DorisStreamLoad implements Serializable { 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload")); this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount()); - lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes(); + lineDelimiter = EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); loadBatchFirstRecord = true; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java index 5d00301..e1ec13e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.sink.EscapeHandler; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; @@ -141,7 +142,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { } public Builder setFieldDelimiter(String fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; + this.fieldDelimiter = EscapeHandler.escapeString(fieldDelimiter); return this; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java new file mode 100644 index 0000000..79d36c5 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.writer.LoadConstants; +import org.apache.doris.flink.sink.writer.RowDataSerializer; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Collector; + +import java.util.Properties; +import java.util.UUID; + + +public class DorisSinkExampleRowData { + + public static void main(String[] args) throws Exception{ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.enableCheckpointing(10000); + env.setParallelism(1); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); + DorisSink.Builder<RowData> builder = DorisSink.builder(); + + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("line_delimiter", "\n"); +// properties.setProperty("read_json_by_line", "true"); +// properties.setProperty("format", "json"); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes("127.0.0.1:8030") + .setTableIdentifier("db.tbl") + .setUsername("root") + .setPassword(""); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder.setLabelPrefix(UUID.randomUUID().toString()) + .setStreamLoadProp(properties); + + //flink rowdata‘s schema + String[] fields = {"name", "age"}; + DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()}; + + builder.setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(RowDataSerializer.builder() //serialize according to rowdata + .setType(LoadConstants.CSV) //.setType(LoadConstants.CSV) + .setFieldDelimiter(",") + .setFieldNames(fields) //.setFieldDelimiter(",") + .setFieldType(types).build()) + .setDorisOptions(dorisBuilder.build()); + + //mock rowdata source + DataStream<RowData> source = env.fromElements("") + .flatMap(new FlatMapFunction<String, RowData>() { + @Override + public void flatMap(String s, Collector<RowData> out) throws Exception { + GenericRowData genericRowData = new GenericRowData(2); + genericRowData.setField(0, StringData.fromString("beijing")); + genericRowData.setField(1, 123); + out.collect(genericRowData); + + GenericRowData genericRowData2 = new GenericRowData(2); + genericRowData2.setField(0, StringData.fromString("shanghai")); + genericRowData2.setField(1, 1234); + out.collect(genericRowData2); + } + }); + + source.sinkTo(builder.build()); + env.execute("doris test"); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org