This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7aa8d4b [Fix] fix flink date and timestamp type not mapping 7aa8d4b is described below commit 7aa8d4bce84eb68a4bf2f842f24fdd6c897ff910 Author: gj-zhang <rancho_zhan...@163.com> AuthorDate: Sun Jun 12 21:56:19 2022 +0800 [Fix] fix flink date and timestamp type not mapping * [FIX] fix flink date and timestamp type not mapping. * The type conversion here is recommended to be abstracted and placed in DorisRowConverter. * overload the constructor for the DorisRowConverter. add unit test for external convert * add license header. --- .../RowDataDeserializationSchema.java | 2 +- .../converter/DorisRowConverter.java | 133 ++++++++++++++++++++- .../apache/doris/flink/serialization/RowBatch.java | 34 +++++- .../doris/flink/sink/writer/RowDataSerializer.java | 15 +-- .../doris/flink/table/DorisRowDataInputFormat.java | 2 +- .../doris/flink/DorisDateAndTimestampSqlTest.java | 66 ++++++++++ .../convert/DorisRowConverterTest.java | 64 +++++++++- .../doris/flink/serialization/TestRowBatch.java | 14 ++- 8 files changed, 303 insertions(+), 27 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java index f9c65b3..3342b66 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java @@ -44,7 +44,7 @@ public class RowDataDeserializationSchema implements DorisDeserializationSchema< @Override public void deserialize(List<?> record, Collector<RowData> out) throws Exception { - RowData row = rowConverter.convert(record); + RowData row = rowConverter.convertInternal(record); out.collect(row); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java index ee7a9a7..d1bb529 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -21,12 +21,20 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; import java.io.Serializable; import java.math.BigDecimal; +import java.sql.Date; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -35,12 +43,26 @@ public class DorisRowConverter implements Serializable { private static final long serialVersionUID = 1L; private final DeserializationConverter[] deserializationConverters; + private final SerializationConverter[] serializationConverters; public DorisRowConverter(RowType rowType) { checkNotNull(rowType); this.deserializationConverters = new DeserializationConverter[rowType.getFieldCount()]; + this.serializationConverters = new SerializationConverter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); i++) { - deserializationConverters[i] = createNullableConverter(rowType.getTypeAt(i)); + deserializationConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i)); + serializationConverters[i] = createNullableExternalConverter(rowType.getTypeAt(i)); + } + } + + public DorisRowConverter(DataType[] dataTypes) { + checkNotNull(dataTypes); + this.deserializationConverters = new DeserializationConverter[dataTypes.length]; + this.serializationConverters = new SerializationConverter[dataTypes.length]; + for (int i = 0; i < dataTypes.length; i++) { + LogicalType logicalType = dataTypes[i].getLogicalType(); + deserializationConverters[i] = createNullableInternalConverter(logicalType); + serializationConverters[i] = createNullableExternalConverter(logicalType); } } @@ -49,7 +71,7 @@ public class DorisRowConverter implements Serializable { * * @param record from rowBatch */ - public GenericRowData convert(List record){ + public GenericRowData convertInternal(List record) { GenericRowData rowData = new GenericRowData(deserializationConverters.length); for (int i = 0; i < deserializationConverters.length ; i++) { rowData.setField(i, deserializationConverters[i].deserialize(record.get(i))); @@ -57,13 +79,23 @@ public class DorisRowConverter implements Serializable { return rowData; } + /** + * Convert data from {@link RowData} to {@link RowBatch} + * @param rowData record from flink rowdata + * @param index the field index + * @return java type value. + */ + public Object convertExternal(RowData rowData, int index) { + return serializationConverters[index].serialize(index, rowData); + } + /** * Create a nullable runtime {@link DeserializationConverter} from given {@link * LogicalType}. */ - protected DeserializationConverter createNullableConverter(LogicalType type) { - return wrapIntoNullableInternalConverter(createConverter(type)); + protected DeserializationConverter createNullableInternalConverter(LogicalType type) { + return wrapIntoNullableInternalConverter(createInternalConverter(type)); } protected DeserializationConverter wrapIntoNullableInternalConverter( @@ -77,6 +109,20 @@ public class DorisRowConverter implements Serializable { }; } + protected SerializationConverter createNullableExternalConverter(LogicalType type) { + return wrapIntoNullableExternalConverter(createExternalConverter(type)); + } + + protected SerializationConverter wrapIntoNullableExternalConverter(SerializationConverter serializationConverter) { + return (index, val) -> { + if (val == null) { + return null; + } else { + return serializationConverter.serialize(index, val); + } + }; + } + /** Runtime converter to convert doris field to {@link RowData} type object. */ @FunctionalInterface interface DeserializationConverter extends Serializable { @@ -88,7 +134,15 @@ public class DorisRowConverter implements Serializable { Object deserialize(Object field); } - protected DeserializationConverter createConverter(LogicalType type) { + /** + * Runtime converter to convert {@link RowData} type object to doris field. + */ + @FunctionalInterface + interface SerializationConverter extends Serializable { + Object serialize(int index, RowData field); + } + + protected DeserializationConverter createInternalConverter(LogicalType type) { switch (type.getTypeRoot()) { case NULL: return val -> null; @@ -109,7 +163,21 @@ public class DorisRowConverter implements Serializable { case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return val -> { + if (val instanceof LocalDateTime) { + return TimestampData.fromLocalDateTime((LocalDateTime) val); + } else { + throw new UnsupportedOperationException("timestamp type must be java.time.LocalDateTime, the actual type is: " + val.getClass().getName()); + } + }; case DATE: + return val -> { + if (val instanceof LocalDate) { + return (int) ((LocalDate) val).toEpochDay(); + } else { + throw new UnsupportedOperationException("timestamp type must be java.time.LocalDate, the actual type is: " + val.getClass()); + } + }; case CHAR: case VARCHAR: return val -> StringData.fromString((String) val); @@ -125,4 +193,59 @@ public class DorisRowConverter implements Serializable { throw new UnsupportedOperationException("Unsupported type:" + type); } } + + protected SerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return ((index, val) -> null); + case CHAR: + case VARCHAR: + return (index, val) -> val.getString(index); + case BOOLEAN: + return (index, val) -> val.getBoolean(index); + case BINARY: + case VARBINARY: + return (index, val) -> val.getBinary(index); + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale); + case TINYINT: + return (index, val) -> val.getByte(index); + case SMALLINT: + return (index, val) -> val.getShort(index); + case INTEGER: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return (index, val) -> val.getInt(index); + case BIGINT: + return (index, val) -> val.getLong(index); + case FLOAT: + return (index, val) -> val.getFloat(index); + case DOUBLE: + return (index, val) -> val.getDouble(index); + case DATE: + return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return (index, val) -> val.getTimestamp(index, timestampPrecision).toTimestamp(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int localP = ((LocalZonedTimestampType) type).getPrecision(); + return (index, val) -> val.getTimestamp(index, localP).toTimestamp(); + case TIMESTAMP_WITH_TIME_ZONE: + final int zonedP = ((ZonedTimestampType) type).getPrecision(); + return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp(); + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case STRUCTURED_TYPE: + case DISTINCT_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 3be6f87..4dd6732 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -42,6 +42,9 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.IOException; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -79,6 +82,9 @@ public class RowBatch { private RootAllocator rootAllocator; private final Schema schema; + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + public List<Row> getRowBatch() { return rowBatch; } @@ -243,8 +249,34 @@ public class RowBatch { } break; case "DATE": - case "LARGEINT": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector date = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (date.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = new String(date.get(rowIndex)); + LocalDate localDate = LocalDate.parse(value, dateFormatter); + addValueToRow(rowIndex, localDate); + } + break; case "DATETIME": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector timeStampSecVector = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (timeStampSecVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = new String(timeStampSecVector.get(rowIndex)); + LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter); + addValueToRow(rowIndex, parse); + } + break; + case "LARGEINT": case "CHAR": case "VARCHAR": case "STRING": diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java index 4d692c0..5d00301 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink.writer; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; @@ -33,18 +34,17 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE; -import static org.apache.flink.table.data.RowData.createFieldGetter; /** * Serializer for RowData. */ public class RowDataSerializer implements DorisRecordSerializer<RowData> { String[] fieldNames; - RowData.FieldGetter[] fieldGetters; String type; private ObjectMapper objectMapper; private final String fieldDelimiter; private final boolean enableDelete; + private final DorisRowConverter rowConverter; private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) { this.fieldNames = fieldNames; @@ -54,15 +54,12 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { if (JSON.equals(type)) { objectMapper = new ObjectMapper(); } - this.fieldGetters = new RowData.FieldGetter[dataTypes.length]; - for (int fieldIndex = 0; fieldIndex < dataTypes.length; fieldIndex++) { - fieldGetters[fieldIndex] = createFieldGetter(dataTypes[fieldIndex].getLogicalType(), fieldIndex); - } + this.rowConverter = new DorisRowConverter(dataTypes); } @Override public byte[] serialize(RowData record) throws IOException{ - int maxIndex = Math.min(record.getArity(), fieldGetters.length); + int maxIndex = Math.min(record.getArity(), fieldNames.length); String valString; if (JSON.equals(type)) { valString = buildJsonString(record, maxIndex); @@ -78,7 +75,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { int fieldIndex = 0; Map<String, String> valueMap = new HashMap<>(); while (fieldIndex < maxIndex) { - Object field = fieldGetters[fieldIndex].getFieldOrNull(record); + Object field = rowConverter.convertExternal(record, fieldIndex); String value = field != null ? field.toString() : null; valueMap.put(fieldNames[fieldIndex], value); fieldIndex++; @@ -93,7 +90,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { int fieldIndex = 0; StringJoiner joiner = new StringJoiner(fieldDelimiter); while (fieldIndex < maxIndex) { - Object field = fieldGetters[fieldIndex].getFieldOrNull(record); + Object field = rowConverter.convertExternal(record, fieldIndex); String value = field != null ? field.toString() : NULL_VALUE; joiner.add(value); fieldIndex++; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index fbcaeea..16cf2ee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -148,7 +148,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable return null; } List next = (List) scalaValueReader.next(); - RowData genericRowData = rowConverter.convert(next); + RowData genericRowData = rowConverter.convertInternal(next); //update hasNext after we've read the record hasNext = scalaValueReader.hasNext(); return genericRowData; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java new file mode 100644 index 0000000..d135995 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import java.util.UUID; + +public class DorisDateAndTimestampSqlTest { + + public static void main(String[] args) { + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql("create table test_source ( " + + " id INT, " + + " score DECIMAL(10, 9), " + + " submit_time TIMESTAMP " + + " ) with ( " + + " 'password'='', " + + " 'connector'='doris', " + + " 'fenodes'='FE_HOST:FE_PORT', " + + " 'table.identifier'='db.source_table', " + + " 'username'='root' " + + ")"); + + tEnv.executeSql("create table test_sink ( " + + " id INT, " + + " score DECIMAL(10, 9), " + + " submit_time DATE " + + " ) with ( " + + " 'password'='', " + + " 'connector'='doris', " + + " 'fenodes'='FE_HOST:FE_PORT', " + + " 'sink.label-prefix' = 'label_" + UUID.randomUUID()+"' , " + + " 'table.identifier'='db.sink_table', " + + " 'username'='root' " + + ")"); + tEnv.executeSql( + "insert into " + + " test_sink " + + "select " + + " id, " + + " score," + + " to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time " + + "from " + + " test_source " + + "where " + + " submit_time>='2022-05-31 00:00:00'") + .print(); + } + +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java index 3489399..e56a40e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -17,23 +17,33 @@ package org.apache.doris.flink.deserialization.convert; import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.sink.writer.RowDataSerializer; +import org.apache.doris.flink.sink.writer.RowDataSerializer.Builder; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class DorisRowConverterTest implements Serializable { @Test - public void testConvert(){ + public void testConvert() throws IOException { ResolvedSchema SCHEMA = ResolvedSchema.of( Column.physical("f1", DataTypes.NULL()), @@ -55,8 +65,54 @@ public class DorisRowConverterTest implements Serializable { DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType()); - List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128, BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01 08:00:00","2021-01-01","a","doris"); - GenericRowData rowData = converter.convert(record); - Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01 08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString()); + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + LocalDate date1 = LocalDate.of(2021, 1, 1); + List record = Arrays.asList(null,true,1.2F,1.2345D,24,10,(byte) 1, (short) 32,64,128L, BigDecimal.valueOf(10.123),time1,time2, date1,"a","doris"); + GenericRowData rowData = converter.convertInternal(record); + + RowDataSerializer serializer = new Builder() + .setFieldType(SCHEMA.getColumnDataTypes().toArray(new DataType[0])) + .setType("csv") + .setFieldDelimiter("|") + .setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"}) + .build(); + String s = new String(serializer.serialize(rowData)); + Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s); + } + + @Test + public void testExternalConvert() { + ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("f1", DataTypes.NULL()), + Column.physical("f2", DataTypes.BOOLEAN()), + Column.physical("f3", DataTypes.FLOAT()), + Column.physical("f4", DataTypes.DOUBLE()), + Column.physical("f5", DataTypes.INTERVAL(DataTypes.YEAR())), + Column.physical("f6", DataTypes.INTERVAL(DataTypes.DAY())), + Column.physical("f7", DataTypes.TINYINT()), + Column.physical("f8", DataTypes.SMALLINT()), + Column.physical("f9", DataTypes.INT()), + Column.physical("f10", DataTypes.BIGINT()), + Column.physical("f11", DataTypes.DECIMAL(10,2)), + Column.physical("f12", DataTypes.TIMESTAMP_WITH_TIME_ZONE()), + Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), + Column.physical("f14", DataTypes.DATE()), + Column.physical("f15", DataTypes.CHAR(1)), + Column.physical("f16", DataTypes.VARCHAR(256))); + DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType()); + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + LocalDate date1 = LocalDate.of(2021, 1, 1); + GenericRowData rowData = GenericRowData.of(null, true, 1.2F, 1.2345D, 24, 10, (byte) 1, (short) 32, 64, 128L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(10.123), 5, 3), + TimestampData.fromLocalDateTime(time1), TimestampData.fromLocalDateTime(time2), + (int) date1.toEpochDay(), StringData.fromString("a"), StringData.fromString("doris")); + List row = new ArrayList(); + for (int i = 0; i < rowData.getArity(); i++) { + row.add(converter.convertExternal(rowData, i)); + } + Assert.assertEquals("[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]", row.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 8b66e01..261acbe 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; @@ -247,8 +249,8 @@ public class TestRowBatch { 1L, (float) 1.1, (double) 1.1, - "2008-08-08", - "2008-08-08 00:00:00", + LocalDate.of(2008, 8, 8), + LocalDateTime.of(2008,8,8,0,0,0), DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2), "char1" ); @@ -261,8 +263,8 @@ public class TestRowBatch { 2L, (float) 2.2, (double) 2.2, - "1900-08-08", - "1900-08-08 00:00:00", + LocalDate.of(1900, 8, 8), + LocalDateTime.of(1900,8,8,0,0,0), DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2), "char2" ); @@ -275,8 +277,8 @@ public class TestRowBatch { 3L, (float) 3.3, (double) 3.3, - "2100-08-08", - "2100-08-08 00:00:00", + LocalDate.of(2100, 8, 8), + LocalDateTime.of(2100,8,8,0,0,0), DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2), "char3" ); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org