This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 706310e  fix hidden column does not take effect (#117)
706310e is described below

commit 706310e73e455f08c9a65b5085e7492c19bd21c8
Author: wudi <676366...@qq.com>
AuthorDate: Fri Mar 10 17:49:47 2023 +0800

    fix hidden column does not take effect (#117)
    
    Co-authored-by: wudi <>
---
 .../org/apache/doris/flink/sink/DorisSink.java     |   1 -
 .../org/apache/doris/flink/sink/EscapeHandler.java |   7 +-
 .../doris/flink/sink/writer/DorisStreamLoad.java   |   3 +-
 .../doris/flink/sink/writer/RowDataSerializer.java |   3 +-
 .../doris/flink/DorisSinkExampleRowData.java       | 101 +++++++++++++++++++++
 5 files changed, 108 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index efb530e..d64e488 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -128,7 +128,6 @@ public class DorisSink<IN> implements Sink<IN, 
DorisCommittable, DorisWriterStat
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
             Preconditions.checkNotNull(serializer);
-            
EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp());
             if(dorisReadOptions == null) {
                 dorisReadOptions = DorisReadOptions.builder().build();
             }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
index 1b9a70f..f83cc79 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
@@ -33,7 +33,7 @@ public class EscapeHandler {
     public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
     public static final Pattern ESCAPE_PATTERN = 
Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
 
-    public String escapeString(String source) {
+    public static String escapeString(String source) {
         if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
             Matcher m = ESCAPE_PATTERN.matcher(source);
             StringBuffer buf = new StringBuffer();
@@ -46,7 +46,7 @@ public class EscapeHandler {
         return source;
     }
 
-    public void handle(Properties properties) {
+    public static void handle(Properties properties) {
         String fieldDelimiter = properties.getProperty(FIELD_DELIMITER_KEY,
                 FIELD_DELIMITER_DEFAULT);
         if (fieldDelimiter.contains(ESCAPE_DELIMITERS_FLAGS)) {
@@ -60,7 +60,6 @@ public class EscapeHandler {
     }
 
     public static void handleEscape(Properties properties) {
-        EscapeHandler handler = new EscapeHandler();
-        handler.handle(properties);
+        handle(properties);
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 80760a2..dc33fce 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -25,6 +25,7 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.ResponseUtil;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -105,7 +106,7 @@ public class DorisStreamLoad implements Serializable {
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(), new 
ExecutorThreadFactory("stream-load-upload"));
         this.recordStream = new RecordStream(executionOptions.getBufferSize(), 
executionOptions.getBufferCount());
-        lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT).getBytes();
+        lineDelimiter = 
EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT)).getBytes();
         loadBatchFirstRecord = true;
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index 5d00301..e1ec13e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
@@ -141,7 +142,7 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
         }
 
         public Builder setFieldDelimiter(String fieldDelimiter) {
-            this.fieldDelimiter = fieldDelimiter;
+            this.fieldDelimiter = EscapeHandler.escapeString(fieldDelimiter);
             return this;
         }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
new file mode 100644
index 0000000..79d36c5
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.LoadConstants;
+import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkExampleRowData {
+
+    public static void main(String[] args) throws Exception{
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.enableCheckpointing(10000);
+        env.setParallelism(1);
+        
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
+        DorisSink.Builder<RowData> builder = DorisSink.builder();
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+//        properties.setProperty("read_json_by_line", "true");
+//        properties.setProperty("format", "json");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("db.tbl")
+                .setUsername("root")
+                .setPassword("");
+        DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix(UUID.randomUUID().toString())
+                .setStreamLoadProp(properties);
+
+        //flink rowdata‘s schema
+        String[] fields = {"name", "age"};
+        DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
+
+        builder.setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(RowDataSerializer.builder()  //serialize 
according to rowdata
+                        .setType(LoadConstants.CSV) 
//.setType(LoadConstants.CSV)
+                        .setFieldDelimiter(",")
+                        .setFieldNames(fields) //.setFieldDelimiter(",")
+                        .setFieldType(types).build())
+                .setDorisOptions(dorisBuilder.build());
+
+        //mock rowdata source
+        DataStream<RowData> source = env.fromElements("")
+                .flatMap(new FlatMapFunction<String, RowData>() {
+                    @Override
+                    public void flatMap(String s, Collector<RowData> out) 
throws Exception {
+                        GenericRowData genericRowData = new GenericRowData(2);
+                        genericRowData.setField(0, 
StringData.fromString("beijing"));
+                        genericRowData.setField(1, 123);
+                        out.collect(genericRowData);
+
+                        GenericRowData genericRowData2 = new GenericRowData(2);
+                        genericRowData2.setField(0, 
StringData.fromString("shanghai"));
+                        genericRowData2.setField(1, 1234);
+                        out.collect(genericRowData2);
+                    }
+                });
+
+        source.sinkTo(builder.build());
+        env.execute("doris test");
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to