This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c941b55  [Improve] Optimize the return value of DorisRecordSerializer 
(#247)
c941b55 is described below

commit c941b55a3505c9d9ce39fd6fd098945628f0aa9c
Author: wudi <676366...@qq.com>
AuthorDate: Wed Nov 29 10:05:36 2023 +0800

    [Improve] Optimize the return value of DorisRecordSerializer (#247)
---
 .../doris/flink/sink/batch/DorisBatchWriter.java   | 17 +++--
 .../doris/flink/sink/writer/DorisWriter.java       | 12 ++--
 .../flink/sink/writer/serializer/DorisRecord.java  | 74 ++++++++++++++++++++++
 .../writer/serializer/DorisRecordSerializer.java   |  4 +-
 .../serializer/JsonDebeziumSchemaSerializer.java   |  6 +-
 .../serializer/RecordWithMetaSerializer.java       |  6 +-
 .../sink/writer/serializer/RowDataSerializer.java  |  5 +-
 .../sink/writer/serializer/RowSerializer.java      |  3 +-
 .../writer/serializer/SimpleStringSerializer.java  |  4 +-
 .../convert/DorisRowConverterTest.java             |  2 +-
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  8 +--
 .../flink/sink/writer/TestRowDataSerializer.java   |  8 +--
 .../doris/flink/sink/writer/TestRowSerializer.java |  8 +--
 13 files changed, 112 insertions(+), 45 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 08a58cd..567758d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -20,11 +20,11 @@ package org.apache.doris.flink.sink.batch;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -93,18 +93,17 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> 
{
         checkFlushException();
         String db = this.database;
         String tbl = this.table;
-        Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
-        if(rowTuple == null || rowTuple.f1 == null){
+        DorisRecord record = serializer.serialize(in);
+        if(record == null || record.getRow() == null){
             //ddl or value is null
             return;
         }
         //multi table load
-        if(rowTuple.f0 != null){
-            String[] tableInfo = rowTuple.f0.split("\\.");
-            db = tableInfo[0];
-            tbl = tableInfo[1];
+        if(record.getTableIdentifier() != null){
+            db = record.getDatabase();
+            tbl = record.getTable();
         }
-        batchStreamLoad.writeRecord(db, tbl, rowTuple.f1);
+        batchStreamLoad.writeRecord(db, tbl, record.getRow());
     }
     @Override
     public void flush(boolean flush) throws IOException, InterruptedException {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index cb330c3..d5e5bc1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,12 +27,12 @@ import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.slf4j.Logger;
@@ -152,14 +152,14 @@ public class DorisWriter<IN> implements 
StatefulSink.StatefulSinkWriter<IN, Dori
         checkLoadException();
         String tableKey = dorisOptions.getTableIdentifier();
 
-        Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
-        if(rowTuple == null || rowTuple.f1 == null){
+        DorisRecord record = serializer.serialize(in);
+        if(record == null || record.getRow() == null){
             //ddl or value is null
             return;
         }
         //multi table load
-        if(rowTuple.f0 != null){
-            tableKey = rowTuple.f0;
+        if(record.getTableIdentifier() != null){
+            tableKey = record.getTableIdentifier();
         }
 
         DorisStreamLoad streamLoader = getStreamLoader(tableKey);
@@ -171,7 +171,7 @@ public class DorisWriter<IN> implements 
StatefulSink.StatefulSinkWriter<IN, Dori
             loadingMap.put(tableKey, true);
             globalLoading = true;
         }
-        streamLoader.writeRecord(rowTuple.f1);
+        streamLoader.writeRecord(record.getRow());
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
new file mode 100644
index 0000000..02ae3bc
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.sink.writer.serializer;
+
+import java.io.Serializable;
+
+public class DorisRecord implements Serializable {
+    private String database;
+    private String table;
+    private byte[] row;
+
+    public DorisRecord() {
+    }
+
+    public DorisRecord(String database, String table, byte[] row) {
+        this.database = database;
+        this.table = table;
+        this.row = row;
+    }
+
+    public String getTableIdentifier(){
+        if(database == null || table == null){
+            return null;
+        }
+        return database + "." + table;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public byte[] getRow() {
+        return row;
+    }
+
+    public void setRow(byte[] row) {
+        this.row = row;
+    }
+
+    public static DorisRecord of(String database, String table, byte[] row) {
+        return new DorisRecord(database, table, row);
+    }
+
+    public static DorisRecord of(byte[] row) {
+        return new DorisRecord(null, null, row);
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
index aa2b63d..8c699f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.flink.sink.writer.serializer;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import java.io.IOException;
 import java.io.Serializable;
 
@@ -34,6 +32,6 @@ public interface DorisRecordSerializer<T> extends 
Serializable {
      * @return [tableIdentifer,byte array]
      * @throws IOException
      */
-    Tuple2<String, byte[]> serialize(T record) throws IOException;
+    DorisRecord serialize(T record) throws IOException;
 
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 37390ce..4dc654e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -121,7 +121,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     @Override
-    public Tuple2<String, byte[]> serialize(String record) throws IOException {
+    public DorisRecord serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = extractJsonNode(recordRoot, "op");
@@ -146,7 +146,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 addDeleteSign(valueMap, false);
                 break;
             case OP_UPDATE:
-                return Tuple2.of(null, extractUpdate(recordRoot));
+                return DorisRecord.of(extractUpdate(recordRoot));
             case OP_DELETE:
                 valueMap = extractBeforeRow(recordRoot);
                 addDeleteSign(valueMap, true);
@@ -155,7 +155,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 LOG.error("parse record fail, unknown op {} in {}", op, 
record);
                 return null;
         }
-        return Tuple2.of(null, 
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+        return 
DorisRecord.of(objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
     }
 
     /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
index 21608a7..74e2853 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RecordWithMetaSerializer.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink.sink.writer.serializer;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.sink.batch.RecordWithMeta;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +29,7 @@ public class RecordWithMetaSerializer implements 
DorisRecordSerializer<RecordWit
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordWithMetaSerializer.class);
 
     @Override
-    public Tuple2<String, byte[]> serialize(RecordWithMeta record) throws 
IOException {
+    public DorisRecord serialize(RecordWithMeta record) throws IOException {
         if(StringUtils.isBlank(record.getTable())
                 || StringUtils.isBlank(record.getDatabase())
                 || record.getRecord() == null){
@@ -38,7 +37,6 @@ public class RecordWithMetaSerializer implements 
DorisRecordSerializer<RecordWit
                     record.getDatabase(), record.getTable(), 
record.getRecord());
             return null;
         }
-        String tableKey = record.getDatabase() + "." + record.getTable();
-        return Tuple2.of(tableKey, 
record.getRecord().getBytes(StandardCharsets.UTF_8));
+        return DorisRecord.of(record.getDatabase(), record.getTable(), 
record.getRecord().getBytes(StandardCharsets.UTF_8));
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
index d29830e..a9830a5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.sink.writer.serializer;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.doris.flink.sink.EscapeHandler;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
@@ -60,7 +59,7 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
     }
 
     @Override
-    public Tuple2<String, byte[]> serialize(RowData record) throws IOException{
+    public DorisRecord serialize(RowData record) throws IOException{
         int maxIndex = Math.min(record.getArity(), fieldNames.length);
         String valString;
         if (JSON.equals(type)) {
@@ -70,7 +69,7 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
         } else {
             throw new IllegalArgumentException("The type " + type + " is not 
supported!");
         }
-        return Tuple2.of(null, valString.getBytes(StandardCharsets.UTF_8));
+        return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8));
     }
 
     public String buildJsonString(RowData record, int maxIndex) throws 
IOException {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
index 3a0e610..a3cf61c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowSerializer.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.flink.sink.writer.serializer;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.RowRowConverter;
@@ -55,7 +54,7 @@ public class RowSerializer implements 
DorisRecordSerializer<Row> {
     }
 
     @Override
-    public Tuple2<String, byte[]> serialize(Row record) throws IOException{
+    public DorisRecord serialize(Row record) throws IOException{
         RowData rowDataRecord = this.rowRowConverter.toInternal(record);
         return this.rowDataSerializer.serialize(rowDataRecord);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
index 206e641..aa2ad85 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/SimpleStringSerializer.java
@@ -28,7 +28,7 @@ import java.nio.charset.StandardCharsets;
 public class SimpleStringSerializer implements DorisRecordSerializer<String> {
 
     @Override
-    public Tuple2<String, byte[]> serialize(String record) throws IOException {
-        return Tuple2.of(null, record.getBytes(StandardCharsets.UTF_8));
+    public DorisRecord serialize(String record) throws IOException {
+        return DorisRecord.of(record.getBytes(StandardCharsets.UTF_8));
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index d433384..a67b89c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -77,7 +77,7 @@ public class DorisRowConverterTest implements Serializable {
                 .setFieldDelimiter("|")
                 .setFieldNames(new 
String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"})
                 .build();
-        String s = new String(serializer.serialize(rowData).f1);
+        String s = new String(serializer.serialize(rowData).getRow());
         
Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 
08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index f1e072e..64c27b7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -72,7 +72,7 @@ public class TestJsonDebeziumSchemaSerializer {
         // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 
10:01:02','2022-01-01 10:01:03');
         byte[] serializedValue = serializer.serialize(
                 
"{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":
 [...]
-                .f1;
+                .getRow();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
@@ -91,7 +91,7 @@ public class TestJsonDebeziumSchemaSerializer {
         // update t1 set name='doris-update' WHERE id =1;
         byte[] serializedValue = serializer.serialize(
                 
"{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\
 [...]
-                .f1;
+                .getRow();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
@@ -112,7 +112,7 @@ public class TestJsonDebeziumSchemaSerializer {
         // update t1 set name='doris-update' WHERE id =1;
         byte[] serializedValue = serializer.serialize(
                 
"{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\
 [...]
-                .f1;
+                .getRow();
         String row = new String(serializedValue, StandardCharsets.UTF_8);
         String[] split = row.split("\n");
         Map<String, String> valueMap = objectMapper.readValue(split[1], new 
TypeReference<Map<String, String>>() {
@@ -134,7 +134,7 @@ public class TestJsonDebeziumSchemaSerializer {
     public void testSerializeDelete() throws IOException {
         byte[] serializedValue = serializer.serialize(
                 
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t
 [...]
-                .f1;
+                .getRow();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
                 new TypeReference<Map<String, String>>() {
                 });
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
index 6769a72..d9e9423 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
@@ -58,7 +58,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData).f1;
+        byte[] serializedValue = serializer.serialize(rowData).getRow();
         
Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), 
serializedValue);
     }
 
@@ -67,7 +67,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData).f1;
+        byte[] serializedValue = serializer.serialize(rowData).getRow();
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
@@ -80,7 +80,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData).f1;
+        byte[] serializedValue = serializer.serialize(rowData).getRow();
         
Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), 
serializedValue);
     }
 
@@ -89,7 +89,7 @@ public class TestRowDataSerializer {
         RowDataSerializer.Builder builder = RowDataSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
         RowDataSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(rowData).f1;
+        byte[] serializedValue = serializer.serialize(rowData).getRow();
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
index 4dfe265..1c60e59 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
@@ -56,7 +56,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row).f1;
+        byte[] serializedValue = serializer.serialize(row).getRow();
         
Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), 
serializedValue);
     }
 
@@ -65,7 +65,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row).f1;
+        byte[] serializedValue = serializer.serialize(row).getRow();
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));
@@ -78,7 +78,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row).f1;
+        byte[] serializedValue = serializer.serialize(row).getRow();
         
Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), 
serializedValue);
     }
 
@@ -87,7 +87,7 @@ public class TestRowSerializer {
         RowSerializer.Builder builder = RowSerializer.builder();
         
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
         RowSerializer serializer = builder.build();
-        byte[] serializedValue = serializer.serialize(row).f1;
+        byte[] serializedValue = serializer.serialize(row).getRow();
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
         Assert.assertEquals("3", valueMap.get("id"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to