This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d7b7b7  [Enhancement] ADD RowSerializer for doris flink connector 
(#71)
8d7b7b7 is described below

commit 8d7b7b7db59f142cd19683e91c686cc2ed96008d
Author: DingGeGe <109070189+dinggege1...@users.noreply.github.com>
AuthorDate: Mon Oct 10 16:44:19 2022 +0800

    [Enhancement] ADD RowSerializer for doris flink connector (#71)
    
    * [Enhancement] ADD RowSerializer for doris flink connector
---
 .../doris/flink/sink/writer/RowSerializer.java     | 107 +++++++++++++++++++++
 .../doris/flink/sink/writer/TestRowSerializer.java |  97 +++++++++++++++++++
 2 files changed, 204 insertions(+)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
new file mode 100644
index 0000000..3a07951
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import java.io.IOException;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+
+/**
+ * Serializer for {@link Row}.
+ * Quick way to support RowSerializer on existing code
+ * TODO: support original Doris to Row serializer
+ */
+public class RowSerializer implements DorisRecordSerializer<Row> {
+    /**
+     * converter {@link Row} to {@link RowData}
+     */
+    private final RowRowConverter rowRowConverter;
+    private final RowDataSerializer rowDataSerializer;
+
+    private RowSerializer(String[] fieldNames, DataType[] dataTypes, String 
type, String fieldDelimiter,
+                          boolean enableDelete) {
+        this.rowRowConverter = 
RowRowConverter.create(DataTypes.ROW(dataTypes));
+        this.rowDataSerializer = RowDataSerializer.builder()
+                .setFieldNames(fieldNames)
+                .setFieldType(dataTypes)
+                .setType(type)
+                .setFieldDelimiter(fieldDelimiter)
+                .enableDelete(enableDelete)
+                .build();
+    }
+
+    @Override
+    public byte[] serialize(Row record) throws IOException{
+        RowData rowDataRecord = this.rowRowConverter.toInternal(record);
+        return this.rowDataSerializer.serialize(rowDataRecord);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for RowSerializer.
+     */
+    public static class Builder {
+        private String[] fieldNames;
+        private DataType[] dataTypes;
+        private String type;
+        private String fieldDelimiter;
+        private boolean deletable;
+
+        public Builder setFieldNames(String[] fieldNames) {
+            this.fieldNames = fieldNames;
+            return this;
+        }
+
+        public Builder setFieldType(DataType[] dataTypes) {
+            this.dataTypes = dataTypes;
+            return this;
+        }
+
+        public Builder setType(String type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(String fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder enableDelete(boolean deletable) {
+            this.deletable = deletable;
+            return this;
+        }
+
+        public RowSerializer build() {
+            Preconditions.checkState(CSV.equals(type) && fieldDelimiter != 
null || JSON.equals(type));
+            Preconditions.checkNotNull(dataTypes);
+            Preconditions.checkNotNull(fieldNames);
+            return new RowSerializer(fieldNames, dataTypes, type, 
fieldDelimiter, deletable);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
new file mode 100644
index 0000000..6c07289
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * test for RowSerializer.
+ */
+public class TestRowSerializer {
+    static Row row;
+    static DataType[] dataTypes;
+    static String[] fieldNames;
+
+    @BeforeClass
+    public static void setUp() {
+        row = new Row(3);
+        row.setField(0, 3);
+        row.setField(1, "test");
+        row.setField(2, 60.2);
+        row.setKind(RowKind.INSERT);
+        dataTypes = new DataType[]{DataTypes.INT(), DataTypes.STRING(), 
DataTypes.DOUBLE()};
+        fieldNames = new String[]{"id", "name", "weight"};
+    }
+
+    @Test
+    public void testSerializeCsv() throws IOException {
+        RowSerializer.Builder builder = RowSerializer.builder();
+        
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
+        RowSerializer serializer = builder.build();
+        byte[] serializedValue = serializer.serialize(row);
+        
Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), 
serializedValue);
+    }
+
+    @Test
+    public void testSerializeJson() throws IOException {
+        RowSerializer.Builder builder = RowSerializer.builder();
+        
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
+        RowSerializer serializer = builder.build();
+        byte[] serializedValue = serializer.serialize(row);
+        ObjectMapper objectMapper = new ObjectMapper();
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        Assert.assertEquals("3", valueMap.get("id"));
+        Assert.assertEquals("test", valueMap.get("name"));
+        Assert.assertEquals("60.2", valueMap.get("weight"));
+    }
+
+    @Test
+    public void testSerializeCsvWithSign() throws IOException {
+        RowSerializer.Builder builder = RowSerializer.builder();
+        
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
+        RowSerializer serializer = builder.build();
+        byte[] serializedValue = serializer.serialize(row);
+        
Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), 
serializedValue);
+    }
+
+    @Test
+    public void testSerializeJsonWithSign() throws IOException {
+        RowSerializer.Builder builder = RowSerializer.builder();
+        
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
+        RowSerializer serializer = builder.build();
+        byte[] serializedValue = serializer.serialize(row);
+        ObjectMapper objectMapper = new ObjectMapper();
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        Assert.assertEquals("3", valueMap.get("id"));
+        Assert.assertEquals("test", valueMap.get("name"));
+        Assert.assertEquals("60.2", valueMap.get("weight"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to