This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new c941b55 [Improve] Optimize the return value of DorisRecordSerializer (#247) c941b55 is described below commit c941b55a3505c9d9ce39fd6fd098945628f0aa9c Author: wudi <676366...@qq.com> AuthorDate: Wed Nov 29 10:05:36 2023 +0800 [Improve] Optimize the return value of DorisRecordSerializer (#247) --- .../doris/flink/sink/batch/DorisBatchWriter.java | 17 +++-- .../doris/flink/sink/writer/DorisWriter.java | 12 ++-- .../flink/sink/writer/serializer/DorisRecord.java | 74 ++++++++++++++++++++++ .../writer/serializer/DorisRecordSerializer.java | 4 +- .../serializer/JsonDebeziumSchemaSerializer.java | 6 +- .../serializer/RecordWithMetaSerializer.java | 6 +- .../sink/writer/serializer/RowDataSerializer.java | 5 +- .../sink/writer/serializer/RowSerializer.java | 3 +- .../writer/serializer/SimpleStringSerializer.java | 4 +- .../convert/DorisRowConverterTest.java | 2 +- .../writer/TestJsonDebeziumSchemaSerializer.java | 8 +-- .../flink/sink/writer/TestRowDataSerializer.java | 8 +-- .../doris/flink/sink/writer/TestRowSerializer.java | 8 +-- 13 files changed, 112 insertions(+), 45 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 08a58cd..567758d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -20,11 +20,11 @@ package org.apache.doris.flink.sink.batch; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -93,18 +93,17 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> { checkFlushException(); String db = this.database; String tbl = this.table; - Tuple2<String, byte[]> rowTuple = serializer.serialize(in); - if(rowTuple == null || rowTuple.f1 == null){ + DorisRecord record = serializer.serialize(in); + if(record == null || record.getRow() == null){ //ddl or value is null return; } //multi table load - if(rowTuple.f0 != null){ - String[] tableInfo = rowTuple.f0.split("\\."); - db = tableInfo[0]; - tbl = tableInfo[1]; + if(record.getTableIdentifier() != null){ + db = record.getDatabase(); + tbl = record.getTable(); } - batchStreamLoad.writeRecord(db, tbl, rowTuple.f1); + batchStreamLoad.writeRecord(db, tbl, record.getRow()); } @Override public void flush(boolean flush) throws IOException, InterruptedException { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index cb330c3..d5e5bc1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -27,12 +27,12 @@ import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpUtil; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; @@ -152,14 +152,14 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori checkLoadException(); String tableKey = dorisOptions.getTableIdentifier(); - Tuple2<String, byte[]> rowTuple = serializer.serialize(in); - if(rowTuple == null || rowTuple.f1 == null){ + DorisRecord record = serializer.serialize(in); + if(record == null || record.getRow() == null){ //ddl or value is null return; } //multi table load - if(rowTuple.f0 != null){ - tableKey = rowTuple.f0; + if(record.getTableIdentifier() != null){ + tableKey = record.getTableIdentifier(); } DorisStreamLoad streamLoader = getStreamLoader(tableKey); @@ -171,7 +171,7 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori loadingMap.put(tableKey, true); globalLoading = true; } - streamLoader.writeRecord(rowTuple.f1); + streamLoader.writeRecord(record.getRow()); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java new file mode 100644 index 0000000..02ae3bc --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.sink.writer.serializer; + +import java.io.Serializable; + +public class DorisRecord implements Serializable { + private String database; + private String table; + private byte[] row; + + public DorisRecord() { + } + + public DorisRecord(String database, String table, byte[] row) { + this.database = database; + this.table = table; + this.row = row; + } + + public String getTableIdentifier(){ + if(database == null || table == null){ + return null; + } + return database + "." + table; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public byte[] getRow() { + return row; + } + + public void setRow(byte[] row) { + this.row = row; + } + + public static DorisRecord of(String database, String table, byte[] row) { + return new DorisRecord(database, table, row); + } + + public static DorisRecord of(byte[] row) { + return new DorisRecord(null, null, row); + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java index aa2b63d..8c699f6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java @@ -17,8 +17,6 @@ package org.apache.doris.flink.sink.writer.serializer; -import org.apache.flink.api.java.tuple.Tuple2; - import java.io.IOException; import java.io.Serializable; @@ -34,6 +32,6 @@ public interface DorisRecordSerializer<T> extends Serializable { * @return [tableIdentifer,byte array] * @throws IOException */ - Tuple2<String, byte[]> serialize(T record) throws IOException; + DorisRecord serialize(T record) throws IOException; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 37390ce..4dc654e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -121,7 +121,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } @Override - public Tuple2<String, byte[]> serialize(String record) throws IOException { + public DorisRecord serialize(String record) throws IOException { LOG.debug("received debezium json data {} :", record); JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); String op = extractJsonNode(recordRoot, "op"); @@ -146,7 +146,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin addDeleteSign(valueMap, false); break; case OP_UPDATE: - return Tuple2.of(null, extractUpdate(recordRoot)); + return DorisRecord.of(extractUpdate(recordRoot)); case OP_DELETE: valueMap = extractBeforeRow(recordRoot); addDeleteSign(valueMap, true); @@ -155,7 +155,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin LOG.error("parse record fail, unknown op {} in {}", op, record); return null; } - return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8)); + return DorisRecord.of(objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8)); } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java index 21608a7..74e2853 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java @@ -19,7 +19,6 @@ package org.apache.doris.flink.sink.writer.serializer; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.sink.batch.RecordWithMeta; -import org.apache.flink.api.java.tuple.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ public class RecordWithMetaSerializer implements DorisRecordSerializer<RecordWit private static final Logger LOG = LoggerFactory.getLogger(RecordWithMetaSerializer.class); @Override - public Tuple2<String, byte[]> serialize(RecordWithMeta record) throws IOException { + public DorisRecord serialize(RecordWithMeta record) throws IOException { if(StringUtils.isBlank(record.getTable()) || StringUtils.isBlank(record.getDatabase()) || record.getRecord() == null){ @@ -38,7 +37,6 @@ public class RecordWithMetaSerializer implements DorisRecordSerializer<RecordWit record.getDatabase(), record.getTable(), record.getRecord()); return null; } - String tableKey = record.getDatabase() + "." + record.getTable(); - return Tuple2.of(tableKey, record.getRecord().getBytes(StandardCharsets.UTF_8)); + return DorisRecord.of(record.getDatabase(), record.getTable(), record.getRecord().getBytes(StandardCharsets.UTF_8)); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index d29830e..a9830a5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -20,7 +20,6 @@ package org.apache.doris.flink.sink.writer.serializer; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.deserialization.converter.DorisRowConverter; import org.apache.doris.flink.sink.EscapeHandler; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; @@ -60,7 +59,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { } @Override - public Tuple2<String, byte[]> serialize(RowData record) throws IOException{ + public DorisRecord serialize(RowData record) throws IOException{ int maxIndex = Math.min(record.getArity(), fieldNames.length); String valString; if (JSON.equals(type)) { @@ -70,7 +69,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { } else { throw new IllegalArgumentException("The type " + type + " is not supported!"); } - return Tuple2.of(null, valString.getBytes(StandardCharsets.UTF_8)); + return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8)); } public String buildJsonString(RowData record, int maxIndex) throws IOException { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java index 3a0e610..a3cf61c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java @@ -17,7 +17,6 @@ package org.apache.doris.flink.sink.writer.serializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.RowRowConverter; @@ -55,7 +54,7 @@ public class RowSerializer implements DorisRecordSerializer<Row> { } @Override - public Tuple2<String, byte[]> serialize(Row record) throws IOException{ + public DorisRecord serialize(Row record) throws IOException{ RowData rowDataRecord = this.rowRowConverter.toInternal(record); return this.rowDataSerializer.serialize(rowDataRecord); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java index 206e641..aa2ad85 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java @@ -28,7 +28,7 @@ import java.nio.charset.StandardCharsets; public class SimpleStringSerializer implements DorisRecordSerializer<String> { @Override - public Tuple2<String, byte[]> serialize(String record) throws IOException { - return Tuple2.of(null, record.getBytes(StandardCharsets.UTF_8)); + public DorisRecord serialize(String record) throws IOException { + return DorisRecord.of(record.getBytes(StandardCharsets.UTF_8)); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java index d433384..a67b89c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -77,7 +77,7 @@ public class DorisRowConverterTest implements Serializable { .setFieldDelimiter("|") .setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"}) .build(); - String s = new String(serializer.serialize(rowData).f1); + String s = new String(serializer.serialize(rowData).getRow()); Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index f1e072e..64c27b7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -72,7 +72,7 @@ public class TestJsonDebeziumSchemaSerializer { // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01 10:01:03'); byte[] serializedValue = serializer.serialize( "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\": [...] - .f1; + .getRow(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() { }); @@ -91,7 +91,7 @@ public class TestJsonDebeziumSchemaSerializer { // update t1 set name='doris-update' WHERE id =1; byte[] serializedValue = serializer.serialize( "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...] - .f1; + .getRow(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() { }); @@ -112,7 +112,7 @@ public class TestJsonDebeziumSchemaSerializer { // update t1 set name='doris-update' WHERE id =1; byte[] serializedValue = serializer.serialize( "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...] - .f1; + .getRow(); String row = new String(serializedValue, StandardCharsets.UTF_8); String[] split = row.split("\n"); Map<String, String> valueMap = objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() { @@ -134,7 +134,7 @@ public class TestJsonDebeziumSchemaSerializer { public void testSerializeDelete() throws IOException { byte[] serializedValue = serializer.serialize( "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t [...] - .f1; + .getRow(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() { }); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java index 6769a72..d9e9423 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java @@ -58,7 +58,7 @@ public class TestRowDataSerializer { RowDataSerializer.Builder builder = RowDataSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false); RowDataSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(rowData).f1; + byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue); } @@ -67,7 +67,7 @@ public class TestRowDataSerializer { RowDataSerializer.Builder builder = RowDataSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false); RowDataSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(rowData).f1; + byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); Assert.assertEquals("3", valueMap.get("id")); @@ -80,7 +80,7 @@ public class TestRowDataSerializer { RowDataSerializer.Builder builder = RowDataSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true); RowDataSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(rowData).f1; + byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue); } @@ -89,7 +89,7 @@ public class TestRowDataSerializer { RowDataSerializer.Builder builder = RowDataSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); RowDataSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(rowData).f1; + byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); Assert.assertEquals("3", valueMap.get("id")); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java index 4dfe265..1c60e59 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java @@ -56,7 +56,7 @@ public class TestRowSerializer { RowSerializer.Builder builder = RowSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false); RowSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(row).f1; + byte[] serializedValue = serializer.serialize(row).getRow(); Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue); } @@ -65,7 +65,7 @@ public class TestRowSerializer { RowSerializer.Builder builder = RowSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false); RowSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(row).f1; + byte[] serializedValue = serializer.serialize(row).getRow(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); Assert.assertEquals("3", valueMap.get("id")); @@ -78,7 +78,7 @@ public class TestRowSerializer { RowSerializer.Builder builder = RowSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true); RowSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(row).f1; + byte[] serializedValue = serializer.serialize(row).getRow(); Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue); } @@ -87,7 +87,7 @@ public class TestRowSerializer { RowSerializer.Builder builder = RowSerializer.builder(); builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); RowSerializer serializer = builder.build(); - byte[] serializedValue = serializer.serialize(row).f1; + byte[] serializedValue = serializer.serialize(row).getRow(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); Assert.assertEquals("3", valueMap.get("id")); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org