This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8d7b7b7 [Enhancement] ADD RowSerializer for doris flink connector (#71) 8d7b7b7 is described below commit 8d7b7b7db59f142cd19683e91c686cc2ed96008d Author: DingGeGe <109070189+dinggege1...@users.noreply.github.com> AuthorDate: Mon Oct 10 16:44:19 2022 +0800 [Enhancement] ADD RowSerializer for doris flink connector (#71) * [Enhancement] ADD RowSerializer for doris flink connector --- .../doris/flink/sink/writer/RowSerializer.java | 107 +++++++++++++++++++++ .../doris/flink/sink/writer/TestRowSerializer.java | 97 +++++++++++++++++++ 2 files changed, 204 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java new file mode 100644 index 0000000..3a07951 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; +import java.io.IOException; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; + +/** + * Serializer for {@link Row}. + * Quick way to support RowSerializer on existing code + * TODO: support original Doris to Row serializer + */ +public class RowSerializer implements DorisRecordSerializer<Row> { + /** + * converter {@link Row} to {@link RowData} + */ + private final RowRowConverter rowRowConverter; + private final RowDataSerializer rowDataSerializer; + + private RowSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, + boolean enableDelete) { + this.rowRowConverter = RowRowConverter.create(DataTypes.ROW(dataTypes)); + this.rowDataSerializer = RowDataSerializer.builder() + .setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType(type) + .setFieldDelimiter(fieldDelimiter) + .enableDelete(enableDelete) + .build(); + } + + @Override + public byte[] serialize(Row record) throws IOException{ + RowData rowDataRecord = this.rowRowConverter.toInternal(record); + return this.rowDataSerializer.serialize(rowDataRecord); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for RowSerializer. + */ + public static class Builder { + private String[] fieldNames; + private DataType[] dataTypes; + private String type; + private String fieldDelimiter; + private boolean deletable; + + public Builder setFieldNames(String[] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + + public Builder setFieldType(DataType[] dataTypes) { + this.dataTypes = dataTypes; + return this; + } + + public Builder setType(String type) { + this.type = type; + return this; + } + + public Builder setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + return this; + } + + public Builder enableDelete(boolean deletable) { + this.deletable = deletable; + return this; + } + + public RowSerializer build() { + Preconditions.checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type)); + Preconditions.checkNotNull(dataTypes); + Preconditions.checkNotNull(fieldNames); + return new RowSerializer(fieldNames, dataTypes, type, fieldDelimiter, deletable); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java new file mode 100644 index 0000000..6c07289 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * test for RowSerializer. + */ +public class TestRowSerializer { + static Row row; + static DataType[] dataTypes; + static String[] fieldNames; + + @BeforeClass + public static void setUp() { + row = new Row(3); + row.setField(0, 3); + row.setField(1, "test"); + row.setField(2, 60.2); + row.setKind(RowKind.INSERT); + dataTypes = new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()}; + fieldNames = new String[]{"id", "name", "weight"}; + } + + @Test + public void testSerializeCsv() throws IOException { + RowSerializer.Builder builder = RowSerializer.builder(); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false); + RowSerializer serializer = builder.build(); + byte[] serializedValue = serializer.serialize(row); + Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue); + } + + @Test + public void testSerializeJson() throws IOException { + RowSerializer.Builder builder = RowSerializer.builder(); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false); + RowSerializer serializer = builder.build(); + byte[] serializedValue = serializer.serialize(row); + ObjectMapper objectMapper = new ObjectMapper(); + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + Assert.assertEquals("3", valueMap.get("id")); + Assert.assertEquals("test", valueMap.get("name")); + Assert.assertEquals("60.2", valueMap.get("weight")); + } + + @Test + public void testSerializeCsvWithSign() throws IOException { + RowSerializer.Builder builder = RowSerializer.builder(); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true); + RowSerializer serializer = builder.build(); + byte[] serializedValue = serializer.serialize(row); + Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue); + } + + @Test + public void testSerializeJsonWithSign() throws IOException { + RowSerializer.Builder builder = RowSerializer.builder(); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); + RowSerializer serializer = builder.build(); + byte[] serializedValue = serializer.serialize(row); + ObjectMapper objectMapper = new ObjectMapper(); + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + Assert.assertEquals("3", valueMap.get("id")); + Assert.assertEquals("test", valueMap.get("name")); + Assert.assertEquals("60.2", valueMap.get("weight")); + Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__")); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org